This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8b31a0ed4 [java] KUDU-2671 support creating table with custom hash
schema per range
8b31a0ed4 is described below
commit 8b31a0ed48a8ee830c68dc74143c924f4b1f2116
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Dec 2 21:07:44 2021 -0800
[java] KUDU-2671 support creating table with custom hash schema per range
This patch introduces changes in the Kudu Java client API to make it
possible to create a Kudu table with custom hash bucket schemas per
range partition. Corresponding test coverage is present as well.
This is a patch to complement 586b79132 at the Kudu Java client side
(586b79132 introduced corresponding changes at the Kudu C++ client).
The appropriate Spark bindings haven't been updated yet -- that
to be done in a separate changelist.
Change-Id: I5ccf77ea2c39808520e76351d62571d449d10894
Reviewed-on: http://gerrit.cloudera.org:8080/18562
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Zoltan Chovan <[email protected]>
Reviewed-by: Attila Bukor <[email protected]>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 4 +-
.../org/apache/kudu/client/CreateTableOptions.java | 99 ++-
.../java/org/apache/kudu/client/KeyEncoder.java | 21 +-
.../java/org/apache/kudu/client/Operation.java | 24 +-
.../org/apache/kudu/client/PartitionPruner.java | 312 +++++--
.../org/apache/kudu/client/PartitionSchema.java | 163 +++-
.../org/apache/kudu/client/ProtobufHelper.java | 53 +-
.../org/apache/kudu/client/RangePartition.java | 66 ++
.../client/RangePartitionWithCustomHashSchema.java | 87 ++
.../org/apache/kudu/client/TestKeyEncoding.java | 59 +-
.../org/apache/kudu/client/TestKuduClient.java | 103 +++
.../java/org/apache/kudu/client/TestKuduTable.java | 902 +++++++++++++++++++++
.../java/org/apache/kudu/client/TestOperation.java | 26 +-
.../apache/kudu/client/TestPartitionPruner.java | 219 +++++
14 files changed, 1969 insertions(+), 169 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 3a0780c14..1b1a690ca 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -633,8 +633,8 @@ public class AsyncKuduClient implements AutoCloseable {
if (builder == null) {
throw new IllegalArgumentException("CreateTableOptions may not be null");
}
- if (!builder.getBuilder().getPartitionSchema().hasRangeSchema() &&
- builder.getBuilder().getPartitionSchema().getHashSchemaCount() == 0) {
+ final Common.PartitionSchemaPB ps =
builder.getBuilder().getPartitionSchema();
+ if (!ps.hasRangeSchema() && ps.getHashSchemaCount() == 0) {
throw new IllegalArgumentException("Table partitioning must be specified
using " +
"setRangePartitionColumns or
addHashPartitions");
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java
index 6ac83dd62..e1d35360c 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java
@@ -38,8 +38,11 @@ public class CreateTableOptions {
private final List<PartialRow> splitRows = Lists.newArrayList();
private final List<RangePartition> rangePartitions = Lists.newArrayList();
+ private final List<RangePartitionWithCustomHashSchema> customRangePartitions
=
+ Lists.newArrayList(); // range partitions with custom hash schemas
private Master.CreateTableRequestPB.Builder pb =
Master.CreateTableRequestPB.newBuilder();
private boolean wait = true;
+ private boolean isPbGenerationDone = false;
/**
* Add a set of hash partitions to the table.
@@ -164,6 +167,23 @@ public class CreateTableOptions {
return this;
}
+ /**
+ * Add range partition with custom hash schema.
+ *
+ * @param rangePartition range partition with custom hash schema
+ * @return this CreateTableOptions object modified accordingly
+ */
+ public CreateTableOptions
addRangePartition(RangePartitionWithCustomHashSchema rangePartition) {
+ if (!splitRows.isEmpty()) {
+ throw new IllegalArgumentException(
+ "no range partitions with custom hash schema are allowed when using
" +
+ "split rows to define range partitioning for a table");
+ }
+ customRangePartitions.add(rangePartition);
+
pb.getPartitionSchemaBuilder().addCustomHashSchemaRanges(rangePartition.toPB());
+ return this;
+ }
+
/**
* Add a range partition split. The split row must fall in a range partition,
* and causes the range partition to split into two contiguous range
partitions.
@@ -174,6 +194,11 @@ public class CreateTableOptions {
* @return this instance
*/
public CreateTableOptions addSplitRow(PartialRow row) {
+ if (!customRangePartitions.isEmpty()) {
+ throw new IllegalArgumentException(
+ "no split rows are allowed to define range partitioning for a table
" +
+ "when range partitions with custom hash schema are present");
+ }
splitRows.add(new PartialRow(row));
return this;
}
@@ -271,55 +296,51 @@ public class CreateTableOptions {
}
Master.CreateTableRequestPB.Builder getBuilder() {
- if (!splitRows.isEmpty() || !rangePartitions.isEmpty()) {
- pb.setSplitRowsRangeBounds(new Operation.OperationsEncoder()
-
.encodeRangePartitions(rangePartitions, splitRows));
+ if (isPbGenerationDone) {
+ return pb;
+ }
+
+ if (!splitRows.isEmpty() && !customRangePartitions.isEmpty()) {
+ throw new IllegalArgumentException(
+ "no split rows are allowed to define range partitioning for a table
" +
+ "when range partitions with custom hash schema are present");
}
+ if (customRangePartitions.isEmpty()) {
+ if (!splitRows.isEmpty() || !rangePartitions.isEmpty()) {
+ pb.setSplitRowsRangeBounds(new Operation.OperationsEncoder()
+ .encodeRangePartitions(rangePartitions, splitRows));
+ }
+ } else {
+ // With the presence of a range with custom hash schema when the
+ // table-wide hash schema is used for a particular range, add proper
+ // element into PartitionSchemaPB::custom_hash_schema_ranges to satisfy
+ // the convention used by the backend. Do so for all the ranges with
+ // table-wide hash schemas.
+ for (RangePartition p : rangePartitions) {
+ org.apache.kudu.Common.PartitionSchemaPB.RangeWithHashSchemaPB.Builder
b =
+ pb.getPartitionSchemaBuilder().addCustomHashSchemaRangesBuilder();
+ // Set the hash schema for the range.
+ for (org.apache.kudu.Common.PartitionSchemaPB.HashBucketSchemaPB
hashSchema :
+ pb.getPartitionSchemaBuilder().getHashSchemaList()) {
+ b.addHashSchema(hashSchema);
+ }
+ b.setRangeBounds(
+ new Operation.OperationsEncoder().encodeLowerAndUpperBounds(
+ p.lowerBound, p.upperBound, p.lowerBoundType,
p.upperBoundType));
+ }
+ }
+ isPbGenerationDone = true;
return pb;
}
List<Integer> getRequiredFeatureFlags() {
- if (rangePartitions.isEmpty()) {
+ if (rangePartitions.isEmpty() && customRangePartitions.isEmpty()) {
return ImmutableList.of();
- } else {
- return
ImmutableList.of(Master.MasterFeatures.RANGE_PARTITION_BOUNDS_VALUE);
}
+ return
ImmutableList.of(Master.MasterFeatures.RANGE_PARTITION_BOUNDS_VALUE);
}
boolean shouldWait() {
return wait;
}
-
- static final class RangePartition {
- private final PartialRow lowerBound;
- private final PartialRow upperBound;
- private final RangePartitionBound lowerBoundType;
- private final RangePartitionBound upperBoundType;
-
- public RangePartition(PartialRow lowerBound,
- PartialRow upperBound,
- RangePartitionBound lowerBoundType,
- RangePartitionBound upperBoundType) {
- this.lowerBound = lowerBound;
- this.upperBound = upperBound;
- this.lowerBoundType = lowerBoundType;
- this.upperBoundType = upperBoundType;
- }
-
- public PartialRow getLowerBound() {
- return lowerBound;
- }
-
- public PartialRow getUpperBound() {
- return upperBound;
- }
-
- public RangePartitionBound getLowerBoundType() {
- return lowerBoundType;
- }
-
- public RangePartitionBound getUpperBoundType() {
- return upperBoundType;
- }
- }
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
index 0e3500e22..638df2bf3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -92,14 +92,18 @@ class KeyEncoder {
* @return an encoded partition key
*/
public static byte[] encodePartitionKey(PartialRow row, PartitionSchema
partitionSchema) {
+ ByteVec rangeBuf = ByteVec.create();
+ encodeColumns(row, partitionSchema.getRangeSchema().getColumnIds(),
rangeBuf);
+
+ // Get the hash bucket schema for the range.
+ final List<HashBucketSchema> hashSchemas =
+ partitionSchema.getHashSchemaForRange(rangeBuf.toArray());
ByteVec buf = ByteVec.create();
- if (!partitionSchema.getHashBucketSchemas().isEmpty()) {
- for (final HashBucketSchema hashSchema :
partitionSchema.getHashBucketSchemas()) {
- encodeHashBucket(getHashBucket(row, hashSchema), buf);
- }
+ for (final HashBucketSchema hashSchema : hashSchemas) {
+ encodeHashBucket(getHashBucket(row, hashSchema), buf);
}
- encodeColumns(row, partitionSchema.getRangeSchema().getColumnIds(), buf);
+ buf.append(rangeBuf);
return buf.toArray();
}
@@ -252,9 +256,9 @@ class KeyEncoder {
ByteBuffer buf = ByteBuffer.wrap(key);
buf.order(ByteOrder.BIG_ENDIAN);
+ final List<HashBucketSchema> hashSchemas =
partitionSchema.getHashSchemaForRange(key);
List<Integer> buckets = new ArrayList<>();
-
- for (int i = 0; i < partitionSchema.getHashBucketSchemas().size(); i++) {
+ for (int i = 0; i < hashSchemas.size(); i++) {
if (buf.hasRemaining()) {
buckets.add(buf.getInt());
} else {
@@ -448,7 +452,8 @@ class KeyEncoder {
byte[] lowerBound,
byte[] upperBound) {
if (partitionSchema.getRangeSchema().getColumnIds().isEmpty() &&
- partitionSchema.getHashBucketSchemas().isEmpty()) {
+ partitionSchema.getHashBucketSchemas().isEmpty() &&
+ partitionSchema.getRangesWithHashSchemas().isEmpty()) {
assert lowerBound.length == 0 && upperBound.length == 0;
return "<no-partitioning>";
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 0b2b933fa..fdd636a31 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -36,7 +36,6 @@ import org.apache.yetus.audience.InterfaceStability;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.RowOperations.RowOperationsPB;
import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
import org.apache.kudu.client.Statistics.Statistic;
@@ -435,7 +434,7 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
}
public RowOperationsPB encodeRangePartitions(
- List<CreateTableOptions.RangePartition> rangePartitions,
+ List<RangePartition> rangePartitions,
List<PartialRow> splitRows) {
if (splitRows.isEmpty() && rangePartitions.isEmpty()) {
@@ -450,15 +449,15 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
encodeRow(row, ChangeType.SPLIT_ROWS);
}
- for (CreateTableOptions.RangePartition partition : rangePartitions) {
+ for (RangePartition partition : rangePartitions) {
encodeRow(partition.getLowerBound(),
- partition.getLowerBoundType() ==
RangePartitionBound.INCLUSIVE_BOUND ?
- ChangeType.RANGE_LOWER_BOUND :
- ChangeType.EXCLUSIVE_RANGE_LOWER_BOUND);
+ partition.getLowerBoundType() ==
RangePartitionBound.INCLUSIVE_BOUND ?
+ ChangeType.RANGE_LOWER_BOUND :
+ ChangeType.EXCLUSIVE_RANGE_LOWER_BOUND);
encodeRow(partition.getUpperBound(),
- partition.getUpperBoundType() ==
RangePartitionBound.EXCLUSIVE_BOUND ?
- ChangeType.RANGE_UPPER_BOUND :
- ChangeType.INCLUSIVE_RANGE_UPPER_BOUND);
+ partition.getUpperBoundType() ==
RangePartitionBound.EXCLUSIVE_BOUND ?
+ ChangeType.RANGE_UPPER_BOUND :
+ ChangeType.INCLUSIVE_RANGE_UPPER_BOUND);
}
return toPB();
@@ -505,7 +504,7 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
* @param schema the table schema to use for decoding
* @return a list of PangePartition objects with corresponding bounds
*/
- public List<CreateTableOptions.RangePartition> decodeRangePartitions(
+ public List<RangePartition> decodeRangePartitions(
RowOperationsPB pb, Schema schema) {
if (pb == null) {
return null;
@@ -531,7 +530,7 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
"unexpected odd number of range partition bounds");
}
- List<CreateTableOptions.RangePartition> result = new ArrayList<>();
+ List<RangePartition> result = new ArrayList<>();
for (int i = 0; i < decodedBounds.size(); i += 2) {
Pair<PartialRow, RowOperationsPB.Type> lower = decodedBounds.get(i);
Pair<PartialRow, RowOperationsPB.Type> upper = decodedBounds.get(i +
1);
@@ -556,8 +555,7 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
"%s: unexpected bound type for the upper bound",
upper.getSecond().toString()));
}
- result.add(new CreateTableOptions.RangePartition(
- lower.getFirst(), upper.getFirst(), lowerType, upperType));
+ result.add(new RangePartition(lower.getFirst(), upper.getFirst(),
lowerType, upperType));
}
return result;
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
index 8785cc454..5b74d6e9e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.kudu.ColumnSchema;
@@ -43,7 +45,7 @@ public class PartitionPruner {
/**
* Constructs a new partition pruner.
- * @param rangePartitions the valid partition key ranges, in reverse sorted
order
+ * @param rangePartitions the valid partition key ranges, sorted in
ascending order
*/
private PartitionPruner(Deque<Pair<byte[], byte[]>> rangePartitions) {
this.rangePartitions = rangePartitions;
@@ -70,11 +72,11 @@ public class PartitionPruner {
*/
public static PartitionPruner create(AbstractKuduScannerBuilder<?, ?>
scanner) {
Schema schema = scanner.table.getSchema();
- PartitionSchema partitionSchema = scanner.table.getPartitionSchema();
+ final PartitionSchema partitionSchema = scanner.table.getPartitionSchema();
PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema();
Map<String, KuduPredicate> predicates = scanner.predicates;
- // Check if the scan can be short circuited entirely by checking the
primary
+ // Check if the scan can be short-circuited entirely by checking the
primary
// key bounds and predicates. This also allows us to assume some
invariants of the
// scan, such as no None predicates and that the lower bound PK < upper
// bound PK.
@@ -160,6 +162,7 @@ public class PartitionPruner {
// key bounds, if they are tighter.
byte[] rangeLowerBound = pushPredsIntoLowerBoundRangeKey(schema,
rangeSchema, predicates);
byte[] rangeUpperBound = pushPredsIntoUpperBoundRangeKey(schema,
rangeSchema, predicates);
+
if (partitionSchema.isSimpleRangePartitioning()) {
if (Bytes.memcmp(rangeLowerBound, scanner.lowerBoundPrimaryKey) < 0) {
rangeLowerBound = scanner.lowerBoundPrimaryKey;
@@ -170,100 +173,112 @@ public class PartitionPruner {
rangeUpperBound = scanner.upperBoundPrimaryKey;
}
}
+ // Since the table can contain range-specific hash schemas, it's necessary
+ // to split the original range into sub-ranges where each subrange comes
+ // with appropriate hash schema.
+ List<PartitionSchema.EncodedRangeBoundsWithHashSchema> preliminaryRanges =
+ splitIntoHashSpecificRanges(rangeLowerBound, rangeUpperBound,
partitionSchema);
+
+ List<Pair<byte[], byte[]>> partitionKeyRangeBytes = new ArrayList<>();
+
+ for (PartitionSchema.EncodedRangeBoundsWithHashSchema preliminaryRange :
preliminaryRanges) {
+ // Step 2: Create the hash bucket portion of the partition key.
+ final List<PartitionSchema.HashBucketSchema> hashBucketSchemas =
+ preliminaryRange.hashSchemas;
+ // List of pruned hash buckets per hash component.
+ List<BitSet> hashComponents = new ArrayList<>(hashBucketSchemas.size());
+ for (PartitionSchema.HashBucketSchema hashSchema : hashBucketSchemas) {
+ hashComponents.add(pruneHashComponent(schema, hashSchema, predicates));
+ }
- // Step 2: Create the hash bucket portion of the partition key.
-
- // List of pruned hash buckets per hash component.
- List<BitSet> hashComponents = new
ArrayList<>(partitionSchema.getHashBucketSchemas().size());
- for (PartitionSchema.HashBucketSchema hashSchema :
partitionSchema.getHashBucketSchemas()) {
- hashComponents.add(pruneHashComponent(schema, hashSchema, predicates));
- }
-
- // The index of the final constrained component in the partition key.
- int constrainedIndex = 0;
- if (rangeLowerBound.length > 0 || rangeUpperBound.length > 0) {
- // The range component is constrained if either of the range bounds are
- // specified (non-empty).
- constrainedIndex = partitionSchema.getHashBucketSchemas().size();
- } else {
- // Search the hash bucket constraints from right to left, looking for the
- // first constrained component.
- for (int i = hashComponents.size(); i > 0; i--) {
- int numBuckets = partitionSchema.getHashBucketSchemas().get(i -
1).getNumBuckets();
- BitSet hashBuckets = hashComponents.get(i - 1);
- if (hashBuckets.nextClearBit(0) < numBuckets) {
- constrainedIndex = i;
- break;
+ // The index of the final constrained component in the partition key.
+ int constrainedIndex = 0;
+ if (preliminaryRange.lower.length > 0 || preliminaryRange.upper.length >
0) {
+ // The range component is constrained if either of the range bounds are
+ // specified (non-empty).
+ constrainedIndex = hashBucketSchemas.size();
+ } else {
+ // Search the hash bucket constraints from right to left, looking for
the
+ // first constrained component.
+ for (int i = hashComponents.size(); i > 0; i--) {
+ int numBuckets = hashBucketSchemas.get(i - 1).getNumBuckets();
+ BitSet hashBuckets = hashComponents.get(i - 1);
+ if (hashBuckets.nextClearBit(0) < numBuckets) {
+ constrainedIndex = i;
+ break;
+ }
}
}
- }
- // Build up a set of partition key ranges out of the hash components.
- //
- // Each hash component simply appends its bucket number to the
- // partition key ranges (possibly incrementing the upper bound by one
bucket
- // number if this is the final constraint, see note 2 in the example
above).
- List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
- partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));
-
- for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
- // This is the final partition key component if this is the final
constrained
- // bucket, and the range upper bound is empty. In this case we need to
- // increment the bucket on the upper bound to convert from inclusive to
- // exclusive.
- boolean isLast = hashIdx + 1 == constrainedIndex &&
rangeUpperBound.length == 0;
- BitSet hashBuckets = hashComponents.get(hashIdx);
-
- List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
- new ArrayList<>(partitionKeyRanges.size() *
hashBuckets.cardinality());
- for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
- for (int bucket = hashBuckets.nextSetBit(0);
- bucket != -1;
- bucket = hashBuckets.nextSetBit(bucket + 1)) {
- int bucketUpper = isLast ? bucket + 1 : bucket;
- ByteVec lower = partitionKeyRange.getFirst().clone();
- ByteVec upper = partitionKeyRange.getFirst().clone();
- KeyEncoder.encodeHashBucket(bucket, lower);
- KeyEncoder.encodeHashBucket(bucketUpper, upper);
- newPartitionKeyRanges.add(new Pair<>(lower, upper));
+ // Build up a set of partition key ranges out of the hash components.
+ //
+ // Each hash component simply appends its bucket number to the
+ // partition key ranges (possibly incrementing the upper bound by one
bucket
+ // number if this is the final constraint, see note 2 in the example
above).
+ List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
+ partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));
+
+ for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
+ // This is the final partition key component if this is the final
constrained
+ // bucket, and the range upper bound is empty. In this case we need to
+ // increment the bucket on the upper bound to convert from inclusive to
+ // exclusive.
+ boolean isLast = hashIdx + 1 == constrainedIndex &&
preliminaryRange.upper.length == 0;
+ BitSet hashBuckets = hashComponents.get(hashIdx);
+
+ List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
+ new ArrayList<>(partitionKeyRanges.size() *
hashBuckets.cardinality());
+ for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
+ for (int bucket = hashBuckets.nextSetBit(0);
+ bucket != -1;
+ bucket = hashBuckets.nextSetBit(bucket + 1)) {
+ int bucketUpper = isLast ? bucket + 1 : bucket;
+ ByteVec lower = partitionKeyRange.getFirst().clone();
+ ByteVec upper = partitionKeyRange.getFirst().clone();
+ KeyEncoder.encodeHashBucket(bucket, lower);
+ KeyEncoder.encodeHashBucket(bucketUpper, upper);
+ newPartitionKeyRanges.add(new Pair<>(lower, upper));
+ }
}
+ partitionKeyRanges = newPartitionKeyRanges;
}
- partitionKeyRanges = newPartitionKeyRanges;
- }
- // Step 3: append the (possibly empty) range bounds to the partition key
ranges.
- for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
- range.getFirst().append(rangeLowerBound);
- range.getSecond().append(rangeUpperBound);
- }
+ // Step 3: append the (possibly empty) range bounds to the partition key
ranges.
+ for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
+ range.getFirst().append(preliminaryRange.lower);
+ range.getSecond().append(preliminaryRange.upper);
+ }
- // Step 4: Filter ranges that fall outside the scan's upper and lower
bound partition keys.
- Deque<Pair<byte[], byte[]>> partitionKeyRangeBytes =
- new ArrayDeque<>(partitionKeyRanges.size());
- for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
- byte[] lower = range.getFirst().toArray();
- byte[] upper = range.getSecond().toArray();
+ // Step 4: Filter ranges that fall outside the scan's upper and lower
bound partition keys.
+ for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
+ byte[] lower = range.getFirst().toArray();
+ byte[] upper = range.getSecond().toArray();
- // Sanity check that the lower bound is less than the upper bound.
- assert upper.length == 0 || Bytes.memcmp(lower, upper) < 0;
+ // Sanity check that the lower bound is less than the upper bound.
+ assert upper.length == 0 || Bytes.memcmp(lower, upper) < 0;
- // Find the intersection of the ranges.
- if (scanner.lowerBoundPartitionKey.length > 0 &&
- (lower.length == 0 || Bytes.memcmp(lower,
scanner.lowerBoundPartitionKey) < 0)) {
- lower = scanner.lowerBoundPartitionKey;
- }
- if (scanner.upperBoundPartitionKey.length > 0 &&
- (upper.length == 0 || Bytes.memcmp(upper,
scanner.upperBoundPartitionKey) > 0)) {
- upper = scanner.upperBoundPartitionKey;
- }
+ // Find the intersection of the ranges.
+ if (scanner.lowerBoundPartitionKey.length > 0 &&
+ (lower.length == 0 || Bytes.memcmp(lower,
scanner.lowerBoundPartitionKey) < 0)) {
+ lower = scanner.lowerBoundPartitionKey;
+ }
+ if (scanner.upperBoundPartitionKey.length > 0 &&
+ (upper.length == 0 || Bytes.memcmp(upper,
scanner.upperBoundPartitionKey) > 0)) {
+ upper = scanner.upperBoundPartitionKey;
+ }
- // If the intersection is valid, then add it as a range partition.
- if (upper.length == 0 || Bytes.memcmp(lower, upper) < 0) {
- partitionKeyRangeBytes.add(new Pair<>(lower, upper));
+ // If the intersection is valid, then add it as a range partition.
+ if (upper.length == 0 || Bytes.memcmp(lower, upper) < 0) {
+ partitionKeyRangeBytes.add(new Pair<>(lower, upper));
+ }
}
}
- return new PartitionPruner(partitionKeyRangeBytes);
+ // The PartitionPruner's constructor expects the collection to be sorted
+ // in ascending order.
+ Collections.sort(partitionKeyRangeBytes,
+ (lhs, rhs) -> Bytes.memcmp(lhs.getFirst(), rhs.getFirst()));
+ return new PartitionPruner(new ArrayDeque<>(partitionKeyRangeBytes));
}
/** @return {@code true} if there are more range partitions to scan. */
@@ -489,6 +504,137 @@ public class PartitionPruner {
return KeyEncoder.encodeRangePartitionKey(row, rangeSchema);
}
+ static List<PartitionSchema.EncodedRangeBoundsWithHashSchema>
splitIntoHashSpecificRanges(
+ byte[] scanLowerBound, byte[] scanUpperBound, PartitionSchema ps) {
+ final List<PartitionSchema.EncodedRangeBoundsWithHashSchema> ranges =
+ ps.getEncodedRangesWithHashSchemas();
+ final List<PartitionSchema.HashBucketSchema> tableWideHashSchema =
+ ps.getHashBucketSchemas();
+
+ // If there aren't any ranges with custom hash schemas or there isn't an
+ // intersection between the set of ranges with custom hash schemas and the
+ // scan range, the result is trivial: the whole scan range is attributed
+ // to the table-wide hash schema.
+ if (ranges.isEmpty()) {
+ return ImmutableList.of(new
PartitionSchema.EncodedRangeBoundsWithHashSchema(
+ scanLowerBound, scanUpperBound, tableWideHashSchema));
+ }
+
+ {
+ final byte[] rangesLowerBound = ranges.get(0).lower;
+ final byte[] rangesUpperBound = ranges.get(ranges.size() - 1).upper;
+
+ if ((scanUpperBound.length != 0 &&
+ Bytes.memcmp(scanUpperBound, rangesLowerBound) <= 0) ||
+ (scanLowerBound.length != 0 && rangesUpperBound.length != 0 &&
+ Bytes.memcmp(rangesUpperBound, scanLowerBound) <= 0)) {
+ return ImmutableList.of(new
PartitionSchema.EncodedRangeBoundsWithHashSchema(
+ scanLowerBound, scanUpperBound, tableWideHashSchema));
+ }
+ }
+
+ // Index of the known range with custom hash schema that the iterator is
+ // currently pointing at or about to point if the iterator is currently
+ // at the scan boundary.
+ int curIdx = -1;
+
+ // Find the first range that is at or after the specified bounds.
+ // TODO(aserbin): maybe, do this in PartitionSchema with O(ln(N))
complexity?
+ for (int idx = 0; idx < ranges.size(); ++idx) {
+ final PartitionSchema.EncodedRangeBoundsWithHashSchema range =
ranges.get(idx);
+
+ // Searching for the first range that is at or after the lower scan
bound.
+ if (curIdx >= 0 ||
+ (range.upper.length != 0 && Bytes.memcmp(range.upper,
scanLowerBound) <= 0)) {
+ continue;
+ }
+ curIdx = idx;
+ }
+
+ Preconditions.checkState(curIdx >= 0);
+ Preconditions.checkState(curIdx < ranges.size());
+
+ // Current position of the iterator.
+ byte[] curPoint = scanLowerBound;
+
+ // Iterate over the scan range from one known boundary to the next one,
+ // enumerating the resulting consecutive sub-ranges and attributing each
+ // sub-range to a proper hash schema. If that's a known range with custom
hash
+ // schema, it's attributed to its range-specific hash schema; otherwise,
+ // a sub-range is attributed to the table-wide hash schema.
+ List<PartitionSchema.EncodedRangeBoundsWithHashSchema> result = new
ArrayList<>();
+ while (curIdx < ranges.size() &&
+ (Bytes.memcmp(curPoint, scanUpperBound) < 0 || scanUpperBound.length
== 0)) {
+ // Check the disposition of cur_point related to the lower boundary
+ // of the range pointed to by 'cur_idx'.
+ final PartitionSchema.EncodedRangeBoundsWithHashSchema curRange =
ranges.get(curIdx);
+ if (Bytes.memcmp(curPoint, curRange.lower) < 0) {
+ // The iterator is before the current range:
+ // |---|
+ // ^
+ // The next known bound is either the upper bound of the current range
+ // or the upper bound of the scan.
+ byte[] upperBound;
+ if (scanUpperBound.length == 0) {
+ upperBound = curRange.lower;
+ } else {
+ if (Bytes.memcmp(curRange.lower, scanUpperBound) < 0) {
+ upperBound = curRange.lower;
+ } else {
+ upperBound = scanUpperBound;
+ }
+ }
+ result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+ curPoint, upperBound, tableWideHashSchema));
+ // Not advancing the 'cur_idx' since cur_point is either at the
beginning
+ // of the range or before it at the upper bound of the scan.
+ } else if (Bytes.memcmp(curPoint, curRange.lower) == 0) {
+ // The iterator is at the lower boundary of the current range:
+ // |---|
+ // ^
+ if ((curRange.upper.length != 0 && Bytes.memcmp(curRange.upper,
scanUpperBound) <= 0) ||
+ scanUpperBound.length == 0) {
+ // The current range is withing the scan boundaries.
+ result.add(curRange);
+ } else {
+ // The current range spans over the upper bound of the scan.
+ result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+ curPoint, scanUpperBound, curRange.hashSchemas));
+ }
+ // Done with the current range, advance to the next one, if any.
+ ++curIdx;
+ } else {
+ if ((scanUpperBound.length != 0 && Bytes.memcmp(scanUpperBound,
curRange.upper) <= 0) ||
+ curRange.upper.length == 0) {
+ result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+ curPoint, scanUpperBound, curRange.hashSchemas));
+ } else {
+ result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+ curPoint, curRange.upper, curRange.hashSchemas));
+ }
+ // Done with the current range, advance to the next one, if any.
+ ++curIdx;
+ }
+ Preconditions.checkState(!result.isEmpty());
+ // Advance the iterator.
+ curPoint = result.get(result.size() - 1).upper;
+ }
+
+ // If exiting from the cycle above by the 'cur_idx < ranges.size()'
condition,
+ // check if the upper bound of the scan is beyond the upper bound of the
last
+ // range with custom hash schema. If so, add an extra range that spans from
+ // the upper bound of the last range to the upper bound of the scan.
+ Preconditions.checkState(!result.isEmpty());
+ final byte[] rangesUpperBound = result.get(result.size() - 1).upper;
+ if (Bytes.memcmp(rangesUpperBound, scanUpperBound) != 0) {
+ Preconditions.checkState(Bytes.memcmp(curPoint, rangesUpperBound) == 0);
+ result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+ curPoint, scanUpperBound, tableWideHashSchema));
+ }
+
+ return result;
+ }
+
/**
* Search all combination of in-list and equality predicates for pruneable
hash partitions.
* @return a bitset containing {@code false} bits for hash buckets which may
be pruned
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
index e34094316..aa234110c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
@@ -17,8 +17,15 @@
package org.apache.kudu.client;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.TreeSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedBytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -32,8 +39,10 @@ import org.apache.kudu.Schema;
* primary key column values of a row into a partition key that can be used to
* find the tablet containing the key.
*
- * The partition schema is made up of zero or more hash bucket components,
- * followed by a single range component.
+ * In case of table-wide hash partitioning, the partition schema is made up of
+ * zero or more hash bucket components, followed by a single range component.
+ * In case of custom hash bucketing per range, the partition schema contains
+ * information on hash bucket components per range.
*
* Each hash bucket component includes one or more columns from the primary key
* column set, with the restriction that an individual primary key column may
@@ -45,24 +54,87 @@ import org.apache.kudu.Schema;
@InterfaceStability.Unstable
public class PartitionSchema {
+ private static final class BoundsComparator
+ implements Comparator<EncodedRangeBoundsWithHashSchema>, Serializable {
+ private static final long serialVersionUID = 36028797018963969L;
+ private static final Comparator<byte[]> comparator =
+ UnsignedBytes.lexicographicalComparator();
+
+ @Override
+ public int compare(EncodedRangeBoundsWithHashSchema lhs,
+ EncodedRangeBoundsWithHashSchema rhs) {
+ return comparator.compare(lhs.lower, rhs.lower);
+ }
+ }
+
+ private static final Comparator<EncodedRangeBoundsWithHashSchema> COMPARATOR
=
+ new BoundsComparator();
+
private final RangeSchema rangeSchema;
private final List<HashBucketSchema> hashBucketSchemas;
+ private final List<RangeWithHashSchema> rangesWithHashSchemas;
+ private final List<EncodedRangeBoundsWithHashSchema>
encodedRangesWithHashSchemas;
+ private TreeSet<EncodedRangeBoundsWithHashSchema> hashSchemasPerRange;
private final boolean isSimple;
+ static class EncodedRangeBoundsWithHashSchema {
+ final byte[] lower;
+ final byte[] upper;
+ final List<HashBucketSchema> hashSchemas;
+
+ public EncodedRangeBoundsWithHashSchema(
+ byte[] lower,
+ byte[] upper,
+ List<HashBucketSchema> hashSchemas) {
+ Preconditions.checkNotNull(lower);
+ Preconditions.checkNotNull(upper);
+ Preconditions.checkState(upper.length == 0 || Bytes.memcmp(lower, upper)
< 0);
+ this.lower = lower;
+ this.upper = upper;
+ this.hashSchemas = hashSchemas;
+ }
+ }
+
/**
* Creates a new partition schema from the range and hash bucket schemas.
*
* @param rangeSchema the range schema
- * @param hashBucketSchemas the hash bucket schemas
+ * @param hashBucketSchemas the table-wide hash schema
* @param schema the table schema
*/
public PartitionSchema(RangeSchema rangeSchema,
- List<HashBucketSchema> hashBucketSchemas,
- Schema schema) {
+ List<HashBucketSchema> hashBucketSchemas,
+ List<RangeWithHashSchema> rangesWithHashSchemas,
+ Schema schema) {
this.rangeSchema = rangeSchema;
this.hashBucketSchemas = hashBucketSchemas;
+ this.rangesWithHashSchemas = rangesWithHashSchemas;
+ this.hashSchemasPerRange = new TreeSet<>(COMPARATOR);
+ this.encodedRangesWithHashSchemas = new
ArrayList<>(rangesWithHashSchemas.size());
+
+ for (RangeWithHashSchema rhs : this.rangesWithHashSchemas) {
+ final boolean isLowerBoundEmpty =
+ rhs.lowerBound == null ||
rhs.lowerBound.getColumnsBitSet().isEmpty();
+ byte[] lower = isLowerBoundEmpty ? new byte[0]
+ : KeyEncoder.encodeRangePartitionKey(rhs.lowerBound,
this.rangeSchema);
+ final boolean isUpperBoundEmpty =
+ rhs.upperBound == null ||
rhs.upperBound.getColumnsBitSet().isEmpty();
+ byte[] upper = isUpperBoundEmpty ? new byte[0]
+ : KeyEncoder.encodeRangePartitionKey(rhs.upperBound,
this.rangeSchema);
+ if (!hashSchemasPerRange.add(
+ new EncodedRangeBoundsWithHashSchema(lower, upper,
rhs.hashSchemas))) {
+ throw new IllegalArgumentException(
+ rhs.lowerBound.toString() + ": duplicate lower range boundary");
+ }
+ }
- boolean isSimple = hashBucketSchemas.isEmpty() &&
+ // Populate the convenience collection storing the information on ranges
+ // with encoded bounds sorted in ascending order by lower bounds.
+ encodedRangesWithHashSchemas.addAll(this.hashSchemasPerRange);
+
+ boolean isSimple =
+ rangesWithHashSchemas.isEmpty() &&
+ hashBucketSchemas.isEmpty() &&
rangeSchema.columns.size() == schema.getPrimaryKeyColumnCount();
if (isSimple) {
int i = 0;
@@ -76,6 +148,19 @@ public class PartitionSchema {
this.isSimple = isSimple;
}
+ /**
+ * Creates a new partition schema from the range and hash bucket schemas.
+ *
+ * @param rangeSchema the range schema
+ * @param hashBucketSchemas the table-wide hash schema
+ * @param schema the table schema
+ */
+ public PartitionSchema(RangeSchema rangeSchema,
+ List<HashBucketSchema> hashBucketSchemas,
+ Schema schema) {
+ this(rangeSchema, hashBucketSchemas, ImmutableList.of(), schema);
+ }
+
/**
* Returns the encoded partition key of the row.
* @return a byte array containing the encoded partition key of the row
@@ -92,6 +177,14 @@ public class PartitionSchema {
return hashBucketSchemas;
}
+ List<RangeWithHashSchema> getRangesWithHashSchemas() {
+ return rangesWithHashSchemas;
+ }
+
+ List<EncodedRangeBoundsWithHashSchema> getEncodedRangesWithHashSchemas() {
+ return encodedRangesWithHashSchemas;
+ }
+
/**
* Returns true if the partition schema if the partition schema does not
include any hash
* components, and the range columns match the table's primary key columns.
@@ -102,6 +195,41 @@ public class PartitionSchema {
return isSimple;
}
+ /**
+ * @return whether the partition schema has ranges with custom hash schemas.
+ */
+ boolean hasCustomHashSchemas() {
+ return !rangesWithHashSchemas.isEmpty();
+ }
+
+ /**
+ * Find hash schema for the given encoded range key. Depending on the
+ * partition schema and the key, it might be either table-wide or a custom
+ * hash schema for a particular range.
+ *
+ * @return hash bucket schema for the encoded range key
+ */
+ List<HashBucketSchema> getHashSchemaForRange(byte[] rangeKey) {
+ if (!hasCustomHashSchemas()) {
+ // By definition, the table-wide hash schema provides the hash bucketing
+ // structure in the absence of per-range custom hash schemas.
+ return hashBucketSchemas;
+ }
+
+ final EncodedRangeBoundsWithHashSchema entry = hashSchemasPerRange.floor(
+ new EncodedRangeBoundsWithHashSchema(rangeKey, new byte[0],
ImmutableList.of()));
+ if (entry == null) {
+ return hashBucketSchemas;
+ }
+ // Check if 'rangeKey' is in the range (null upper boundary means unbounded
+ // range partition).
+ final byte[] upper = entry.upper;
+ if (upper == null || Bytes.memcmp(rangeKey, upper) < 0) {
+ return entry.hashSchemas;
+ }
+ return hashBucketSchemas;
+ }
+
public static class RangeSchema {
private final List<Integer> columns;
@@ -155,4 +283,27 @@ public class PartitionSchema {
return seed;
}
}
+
+ /**
+ * This utility class is used to represent information on a custom hash
schema
+ * for a particular range.
+ */
+ public static class RangeWithHashSchema {
+ public PartialRow lowerBound;
+ public PartialRow upperBound;
+ public List<HashBucketSchema> hashSchemas;
+
+ public RangeWithHashSchema(
+ PartialRow lowerBound,
+ PartialRow upperBound,
+ List<HashBucketSchema> hashSchemas) {
+ Preconditions.checkNotNull(lowerBound);
+ Preconditions.checkNotNull(upperBound);
+ Preconditions.checkArgument(
+ lowerBound.getSchema().equals(upperBound.getSchema()));
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ this.hashSchemas = hashSchemas;
+ }
+ }
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 4381fd8ce..c9d94e7af 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Common;
+import org.apache.kudu.RowOperations;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.util.DateUtil;
@@ -222,7 +223,35 @@ public class ProtobufHelper {
hashSchemas.add(hashSchema);
}
- return new PartitionSchema(rangeSchema, hashSchemas.build(), schema);
+ // Populate the list of ranges with custom hash schemas.
+ ImmutableList.Builder<PartitionSchema.RangeWithHashSchema>
rangesWithHashSchemas =
+ ImmutableList.builder();
+
+ for (Common.PartitionSchemaPB.RangeWithHashSchemaPB rhsPB :
+ pb.getCustomHashSchemaRangesList()) {
+ List<PartitionSchema.HashBucketSchema> rangeHashSchemas = new
ArrayList<>();
+ for (Common.PartitionSchemaPB.HashBucketSchemaPB hbs :
rhsPB.getHashSchemaList()) {
+ rangeHashSchemas.add(new PartitionSchema.HashBucketSchema(
+ pbToIds(hbs.getColumnsList()), hbs.getNumBuckets(),
hbs.getSeed()));
+ }
+
+ // Decode RowOperationsPB into the range bounds.
+ final RowOperations.RowOperationsPB rangeBounds = rhsPB.getRangeBounds();
+ Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
+ final List<RangePartition> partitions =
dec.decodeRangePartitions(rangeBounds, schema);
+ if (partitions.size() != 1) {
+ throw new IllegalArgumentException("unexpected range bounds");
+ }
+ final RangePartition p = partitions.get(0);
+
+ PartitionSchema.RangeWithHashSchema rhs =
+ new PartitionSchema.RangeWithHashSchema(
+ p.lowerBound, p.upperBound, rangeHashSchemas);
+ rangesWithHashSchemas.add(rhs);
+ }
+
+ return new PartitionSchema(
+ rangeSchema, hashSchemas.build(), rangesWithHashSchemas.build(),
schema);
}
public static Common.PartitionSchemaPB partitionSchemaToPb(PartitionSchema
partitionSchema) {
@@ -244,6 +273,28 @@ public class ProtobufHelper {
.build();
builder.setRangeSchema(rangeSchemaPB);
+ // Based on the list of ranges with custom hash schemas, populate the
+ // PartitionSchemaPB.custom_hash_schema_ranges field.
+ for (PartitionSchema.RangeWithHashSchema rhs :
partitionSchema.getRangesWithHashSchemas()) {
+ Common.PartitionSchemaPB.RangeWithHashSchemaPB.Builder rhsBuilder =
+ Common.PartitionSchemaPB.RangeWithHashSchemaPB.newBuilder();
+ for (PartitionSchema.HashBucketSchema hbs : rhs.hashSchemas) {
+ Common.PartitionSchemaPB.HashBucketSchemaPB.Builder hbsBuilder =
+ Common.PartitionSchemaPB.HashBucketSchemaPB.newBuilder()
+ .addAllColumns(idsToPb(hbs.getColumnIds()))
+ .setNumBuckets(hbs.getNumBuckets())
+ .setSeed(hbs.getSeed());
+ rhsBuilder.addHashSchema(hbsBuilder.build());
+ }
+
+ rhsBuilder.setRangeBounds(new
Operation.OperationsEncoder().encodeLowerAndUpperBounds(
+ rhs.lowerBound,
+ rhs.upperBound,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND));
+ builder.addCustomHashSchemaRanges(rhsBuilder.build());
+ }
+
return builder.build();
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartition.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartition.java
new file mode 100644
index 000000000..903f46ac8
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartition.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * This class represents a range partition schema with table-wide hash schema.
+ *
+ * See also RangePartitionWithCustomHashSchema.
+ */
[email protected]
[email protected]
+class RangePartition {
+ final PartialRow lowerBound;
+ final PartialRow upperBound;
+ final RangePartitionBound lowerBoundType;
+ final RangePartitionBound upperBoundType;
+
+ public RangePartition(PartialRow lowerBound,
+ PartialRow upperBound,
+ RangePartitionBound lowerBoundType,
+ RangePartitionBound upperBoundType) {
+ Preconditions.checkNotNull(lowerBound);
+ Preconditions.checkNotNull(upperBound);
+ Preconditions.checkArgument(
+ lowerBound.getSchema().equals(upperBound.getSchema()));
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ this.lowerBoundType = lowerBoundType;
+ this.upperBoundType = upperBoundType;
+ }
+
+ public PartialRow getLowerBound() {
+ return lowerBound;
+ }
+
+ public RangePartitionBound getLowerBoundType() {
+ return lowerBoundType;
+ }
+
+ public PartialRow getUpperBound() {
+ return upperBound;
+ }
+
+ public RangePartitionBound getUpperBoundType() {
+ return upperBoundType;
+ }
+}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartitionWithCustomHashSchema.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartitionWithCustomHashSchema.java
new file mode 100644
index 000000000..df3c0c676
--- /dev/null
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartitionWithCustomHashSchema.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.util.List;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.Common;
+
+/**
+ * This class represents a range partition with custom hash bucketing schema.
+ *
+ * See also RangePartition.
+ */
[email protected]
[email protected]
+public class RangePartitionWithCustomHashSchema extends RangePartition {
+ // Using the corresponding PB type to represent this range with its custom
+ // hash schema.
+ private Common.PartitionSchemaPB.RangeWithHashSchemaPB.Builder pb =
+ Common.PartitionSchemaPB.RangeWithHashSchemaPB.newBuilder();
+
+ /**
+ * @param lowerBound upper bound of the range partition
+ * @param upperBound lower bound of the range partition
+ * @param lowerBoundType lower bound type: inclusive/exclusive
+ * @param upperBoundType upper bound type: inclusive/exclusive
+ * @return new RangePartitionWithCustomHashSchema object
+ */
+ public RangePartitionWithCustomHashSchema(
+ PartialRow lowerBound,
+ PartialRow upperBound,
+ RangePartitionBound lowerBoundType,
+ RangePartitionBound upperBoundType) {
+ super(lowerBound, upperBound, lowerBoundType, upperBoundType);
+ pb.setRangeBounds(
+ new Operation.OperationsEncoder().encodeLowerAndUpperBounds(
+ lowerBound, upperBound, lowerBoundType, upperBoundType));
+ }
+
+ /**
+ * Add a level of hash sub-partitioning for this range partition.
+ *
+ * The hash schema for the range partition is defined by the whole set of
+ * its hash sub-partitioning levels. A range partition can have zero or
+ * multiple levels of hash sub-partitioning: this method can be called
+ * many times on the same object to define a multi-dimensional hash
+ * bucketing structure for the range.
+ *
+ * @param columns name of table's columns to use for hash bucketing
+ * @param numBuckets number of buckets used by the hash function
+ * @param seed the seed for the hash function
+ * @return this RangePartition object modified accordingly
+ */
+ public RangePartition addHashPartitions(
+ List<String> columns, int numBuckets, int seed) {
+ Common.PartitionSchemaPB.HashBucketSchemaPB.Builder b =
+ pb.addHashSchemaBuilder();
+ for (String column : columns) {
+ b.addColumnsBuilder().setName(column);
+ }
+ b.setNumBuckets(numBuckets);
+ b.setSeed(seed);
+ return this;
+ }
+
+ public Common.PartitionSchemaPB.RangeWithHashSchemaPB toPB() {
+ return pb.build();
+ }
+}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 91d5f76b1..b200df152 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -94,8 +94,7 @@ public class TestKeyEncoding {
columnIds.add(i);
}
return new PartitionSchema(
- new PartitionSchema.RangeSchema(columnIds),
- ImmutableList.of(), schema);
+ new PartitionSchema.RangeSchema(columnIds), ImmutableList.of(),
schema);
}
/**
@@ -380,6 +379,62 @@ public class TestKeyEncoding {
});
}
+ @Test
+ public void testPartitionKeyEncodingCustomHashSchema() {
+ Schema schema = buildSchema(
+ new ColumnSchemaBuilder("a", Type.INT32).key(true),
+ new ColumnSchemaBuilder("b", Type.STRING).key(true),
+ new ColumnSchemaBuilder("c", Type.STRING).key(true));
+
+ PartialRow lower = schema.newPartialRow();
+ lower.addInt("a", 0);
+ lower.addString("b", "B");
+ lower.addString("c", "C");
+
+ PartialRow upper = schema.newPartialRow();
+ upper.addInt("a", 10);
+ upper.addString("b", "b");
+ upper.addString("c", "c");
+
+ final PartitionSchema partitionSchema =
+ new PartitionSchema(
+ new RangeSchema(ImmutableList.of(0, 1, 2)),
+ ImmutableList.of(
+ new HashBucketSchema(ImmutableList.of(2), 3, 0)),
+ ImmutableList.of(
+ new PartitionSchema.RangeWithHashSchema(
+ lower,
+ upper,
+ ImmutableList.of(new HashBucketSchema(ImmutableList.of(0,
1), 32, 0)))),
+ schema);
+
+ // That's the row in the range having its own custom hash schema.
+ PartialRow rowA = schema.newPartialRow();
+ rowA.addInt("a", 1);
+ rowA.addString("b", "C");
+ rowA.addString("c", "D");
+ assertBytesEquals(KeyEncoder.encodePartitionKey(rowA, partitionSchema),
+ new byte[]{
+ 0, 0, 0, 0x10, // hash(1, "")
+ (byte) 0x80, 0, 0, 1, // a = 1
+ 'C', 0, 0, // b = "C"
+ 'D' // c = "D"
+ });
+
+ // That's the row encoded with the table-wide hash schema.
+ PartialRow rowB = schema.newPartialRow();
+ rowB.addInt("a", 11);
+ rowB.addString("b", "");
+ rowB.addString("c", "d");
+ assertBytesEquals(KeyEncoder.encodePartitionKey(rowB, partitionSchema),
+ new byte[]{
+ 0, 0, 0, 0x2, // hash("d")
+ (byte) 0x80, 0, 0, 11,// a = 11
+ 0, 0, // b = ""
+ 'd' // c = "d"
+ });
+ }
+
@Test(timeout = 100000)
public void testAllPrimaryKeyTypes() throws Exception {
Schema schema = buildSchema(
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 290ed3b7c..00e5d8f40 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -741,6 +741,109 @@ public class TestKuduClient {
}
}
+ /**
+ * Test inserting and retrieving rows from a table that has a range partition
+ * with custom hash schema.
+ */
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testRangeWithCustomHashSchema() throws Exception {
+ List<ColumnSchema> cols = new ArrayList<>();
+ cols.add(new ColumnSchema.ColumnSchemaBuilder("c0",
Type.INT64).key(true).build());
+ cols.add(new ColumnSchema.ColumnSchemaBuilder("c1",
Type.INT32).nullable(true).build());
+ Schema schema = new Schema(cols);
+
+ CreateTableOptions options = new CreateTableOptions();
+ options.setRangePartitionColumns(ImmutableList.of("c0"));
+ options.addHashPartitions(ImmutableList.of("c0"), 2);
+
+ // Add range partition with table-wide hash schema.
+ {
+ PartialRow lower = schema.newPartialRow();
+ lower.addLong("c0", -100);
+ PartialRow upper = schema.newPartialRow();
+ upper.addLong("c0", 100);
+ options.addRangePartition(lower, upper);
+ }
+
+ // Add a partition with custom hash schema.
+ {
+ PartialRow lower = schema.newPartialRow();
+ lower.addLong("c0", 100);
+ PartialRow upper = schema.newPartialRow();
+ upper.addLong("c0", 200);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("c0"), 5, 0);
+ options.addRangePartition(rangePartition);
+ }
+
+ client.createTable(TABLE_NAME, schema, options);
+
+ KuduSession session = client.newSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ // Check the range with the table-wide hash schema.
+ {
+ for (int i = 0; i < 10; ++i) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addLong("c0", i);
+ row.addInt("c1", 1000 * i);
+ session.apply(insert);
+ }
+
+ // Scan all the rows in the table.
+ List<String> rowStringsAll = scanTableToStrings(table);
+ assertEquals(10, rowStringsAll.size());
+
+ // Now scan the rows that are in the range with the table-wide hash
schema.
+ List<String> rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(schema.getColumn("c0"),
GREATER_EQUAL, 0),
+ KuduPredicate.newComparisonPredicate(schema.getColumn("c0"), LESS,
100));
+ assertEquals(10, rowStrings.size());
+ for (int i = 0; i < rowStrings.size(); ++i) {
+ StringBuilder expectedRow = new StringBuilder();
+ expectedRow.append(String.format("INT64 c0=%d, INT32 c1=%d", i, 1000 *
i));
+ assertEquals(expectedRow.toString(), rowStrings.get(i));
+ }
+ }
+
+ // Check the range with the custom hash schema.
+ {
+ for (int i = 100; i < 110; ++i) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addLong("c0", i);
+ row.addInt("c1", 2 * i);
+ session.apply(insert);
+ }
+
+ // Scan all the rows in the table.
+ List<String> rowStringsAll = scanTableToStrings(table);
+ assertEquals(20, rowStringsAll.size());
+
+ // Now scan the rows that are in the range with the custom hash schema.
+ List<String> rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(schema.getColumn("c0"),
GREATER_EQUAL, 100));
+ assertEquals(10, rowStrings.size());
+ for (int i = 0; i < rowStrings.size(); ++i) {
+ StringBuilder expectedRow = new StringBuilder();
+ expectedRow.append(String.format("INT64 c0=%d, INT32 c1=%d",
+ i + 100, 2 * (i + 100)));
+ assertEquals(expectedRow.toString(), rowStrings.get(i));
+ }
+ }
+ }
+
/**
* Test scanning with limits.
*/
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 3303d1151..5175ee95d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -17,6 +17,11 @@
package org.apache.kudu.client;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.EQUAL;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER_EQUAL;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
@@ -30,10 +35,14 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -483,6 +492,899 @@ public class TestKuduTable {
client.openTable(tableName).getFormattedRangePartitions(10000));
}
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testCreateTablePartitionWithEmptyCustomHashSchema() throws
Exception {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, -100);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 100);
+
+ CreateTableOptions builder = getBasicCreateTableOptions();
+
+ // Using an empty custom hash schema for the range.
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ builder.addRangePartition(rangePartition);
+
+ KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+ // Check the result: retrieve the information on tablets from master
+ // and check if each partition has the expected parameters.
+ {
+ for (KuduScanToken token : new
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+ .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+ Partition p = token.getTablet().getPartition();
+ // No hash partitions are expected.
+ assertEquals(0, p.getHashBuckets().size());
+ }
+
+ final List<Partition> rangePartitions =
+ table.getRangePartitions(client.getDefaultOperationTimeoutMs());
+ assertEquals(1, rangePartitions.size());
+ final Partition p = rangePartitions.get(0);
+
+ assertTrue(p.getRangeKeyStart().length > 0);
+ PartialRow rangeKeyStartDecoded = p.getDecodedRangeKeyStart(table);
+ assertEquals(-100, rangeKeyStartDecoded.getInt(0));
+ assertTrue(p.getRangeKeyEnd().length > 0);
+ PartialRow rangeKeyEndDecoded = p.getDecodedRangeKeyEnd(table);
+ assertEquals(100, rangeKeyEndDecoded.getInt(0));
+ }
+
+ List<String> expected = Lists.newArrayList();
+ expected.add("-100 <= VALUES < 100");
+ assertEquals(
+ expected,
+ client.openTable(tableName).getFormattedRangePartitions(10000));
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testCreateTablePartitionWithCustomHashSchema() throws Exception {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, -100);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 200);
+
+ // Simple custom hash schema for the range: two buckets on the column
"key".
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 2, 0);
+
+ CreateTableOptions builder = getBasicCreateTableOptions();
+ builder.addRangePartition(rangePartition);
+
+ // Add table-wide schema: it should have the same number of dimensions
+ // as the range-specific hash schema. However, this schema isn't used
+ // in this test scenario.
+ builder.addHashPartitions(ImmutableList.of("key"), 7, 0);
+
+ KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+ List<String> expected = Lists.newArrayList();
+ expected.add("-100 <= VALUES < 200");
+ assertEquals(
+ expected,
+ client.openTable(tableName).getFormattedRangePartitions(10000));
+
+ // Check the result: retrieve the information on tablets from master
+ // and check if each partition has expected parameters.
+ {
+ Set<Integer> buckets = new HashSet();
+ for (KuduScanToken token : new
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+ .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+ Partition p = token.getTablet().getPartition();
+ // Two hash partitions are expected per range.
+ assertEquals(1, p.getHashBuckets().size());
+ for (Integer idx : p.getHashBuckets()) {
+ buckets.add(idx);
+ }
+ }
+ assertEquals(2, buckets.size());
+ assertTrue(buckets.contains(0));
+ assertTrue(buckets.contains(1));
+
+ final List<Partition> rangePartitions =
+ table.getRangePartitions(client.getDefaultOperationTimeoutMs());
+ assertEquals(1, rangePartitions.size());
+ final Partition p = rangePartitions.get(0);
+
+ assertTrue(p.getRangeKeyStart().length > 0);
+ PartialRow rangeKeyStartDecoded = p.getDecodedRangeKeyStart(table);
+ assertEquals(-100, rangeKeyStartDecoded.getInt(0));
+ assertTrue(p.getRangeKeyEnd().length > 0);
+ PartialRow rangeKeyEndDecoded = p.getDecodedRangeKeyEnd(table);
+ assertEquals(200, rangeKeyEndDecoded.getInt(0));
+ }
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testRangePartitionWithCustomHashSchemaBasic() throws Exception {
+ final int valLower = 10;
+ final int valUpper = 20;
+
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, valLower);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, valUpper);
+
+ // Simple custom hash schema for the range: five buckets on the column
"key".
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+ CreateTableOptions builder = getBasicCreateTableOptions();
+ builder.addRangePartition(rangePartition);
+ // Add table-wide schema: it should have the same number of dimensions
+ // as the range-specific hash schema. However, this schema isn't used
+ // in this test scenario.
+ builder.addHashPartitions(ImmutableList.of("key"), 32, 0);
+
+ final KuduTable table = client.createTable(tableName, basicSchema,
builder);
+ final PartitionSchema ps = table.getPartitionSchema();
+ assertTrue(ps.hasCustomHashSchemas());
+ assertFalse(ps.isSimpleRangePartitioning());
+
+ // NOTE: use schema from server since ColumnIDs are needed for row encoding
+ final PartialRow rowLower = table.getSchema().newPartialRow();
+ rowLower.addInt(0, valLower);
+
+ final PartialRow rowUpper = table.getSchema().newPartialRow();
+ rowUpper.addInt(0, valUpper);
+
+ {
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(rowLower, ps.getRangeSchema()));
+ // There should be just one dimension with five buckets.
+ assertEquals(1, s.size());
+ assertEquals(5, s.get(0).getNumBuckets());
+ }
+ {
+ // There should be 5 partitions: the newly created table has a single
+ // range with 5 hash buckets, but KuduTable.getRangePartitions() removes
+ // the 'duplicates' with hash code other than 0. So, the result should be
+ // just one partition with hash code 0.
+ List<Partition> partitions = table.getRangePartitions(50000);
+ assertEquals(1, partitions.size());
+ List<Integer> buckets = partitions.get(0).getHashBuckets();
+ assertEquals(1, buckets.size()); // there is just one hash dimension
+ assertEquals(0, buckets.get(0).intValue());
+ }
+ {
+ final byte[] rowLowerEnc = ps.encodePartitionKey(rowLower);
+ final byte[] rowUpperEnc = ps.encodePartitionKey(rowUpper);
+
+ // The range part comes after the hash part in an encoded partition key.
+ // The hash part contains 4 * number_of_hash_dimensions bytes.
+ byte[] hashLower = Arrays.copyOfRange(rowLowerEnc, 4,
rowLowerEnc.length);
+ byte[] hashUpper = Arrays.copyOfRange(rowUpperEnc, 4,
rowUpperEnc.length);
+
+ Set<Integer> buckets = new HashSet();
+ for (KuduScanToken token : new
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+ .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+ final Partition p = token.getTablet().getPartition();
+ assertEquals(0, Bytes.memcmp(p.getRangeKeyStart(), hashLower));
+ assertEquals(0, Bytes.memcmp(p.getRangeKeyEnd(), hashUpper));
+ assertEquals(1, p.getHashBuckets().size());
+ buckets.add(p.getHashBuckets().get(0));
+ }
+
+ // Check the generated scan tokens cover all the tablets for the range:
+ // all hash bucket indices should be present.
+ assertEquals(5, buckets.size());
+ assertTrue(buckets.contains(0));
+ assertTrue(buckets.contains(1));
+ assertTrue(buckets.contains(2));
+ assertTrue(buckets.contains(3));
+ assertTrue(buckets.contains(4));
+ }
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testCreateTableCustomHashSchemasTwoRanges() throws Exception {
+ CreateTableOptions builder = getBasicCreateTableOptions();
+
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, 0);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 100);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 2, 0);
+ builder.addRangePartition(rangePartition);
+ }
+
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, 100);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 200);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 3, 0);
+ builder.addRangePartition(rangePartition);
+ }
+
+ // Add table-wide schema as well -- that's to satisfy the constraint on
+ // the number of hash dimensions in table's hash schemas. However, this
+ // scenario isn't going to create a range with table-wide hash schema.
+ builder.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+ KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+ // Check the result: retrieve the information on tablets from master
+ // and check if each partition has expected parameters.
+ List<LocatedTablet> tablets = table.getTabletsLocations(10000);
+ // There should be 5 tablets: 2 for [0, 100) range and 3 for [100, 200).
+ assertEquals(5, tablets.size());
+
+ List<String> expected = Lists.newArrayList();
+ expected.add("0 <= VALUES < 100");
+ expected.add("100 <= VALUES < 200");
+ assertEquals(
+ expected,
+ client.openTable(tableName).getFormattedRangePartitions(10000));
+
+ // Insert data into the newly created table and read it back.
+ KuduSession session = client.newSession();
+ for (int key = 0; key < 200; ++key) {
+ Insert insert = createBasicSchemaInsert(table, key);
+ session.apply(insert);
+ }
+ session.flush();
+
+ // Do full table scan.
+ List<String> rowStrings = scanTableToStrings(table);
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, -1));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 0));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 1));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 99));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 100));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 101));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 199));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 200));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 201));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 0));
+ assertEquals(199, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 100));
+ assertEquals(99, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 199));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 200));
+ assertEquals(0, rowStrings.size());
+
+ // Predicate to have all rows in the range with table-wide hash schema.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 100));
+ assertEquals(100, rowStrings.size());
+
+ // Predicate to have all rows in the range with custom hash schema.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 100),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(100, rowStrings.size());
+
+ // Predicate to have one part of the rows in the range with table-wide hash
+ // schema, and the other part from the range with custom hash schema.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 50),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 150));
+ assertEquals(100, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 150));
+ assertEquals(150, rowStrings.size());
+
+ // Predicates to almost cover the both ranges (sort of off-by-one
situation).
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 1),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 199));
+ assertEquals(198, rowStrings.size());
+
+ // Predicates to almost cover the both ranges (sort of off-by-one
situation).
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 1),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(199, rowStrings.size());
+
+ // Predicates to almost cover the both ranges (sort of off-by-one
situation).
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 199));
+ assertEquals(199, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 199));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS_EQUAL, 0));
+ assertEquals(1, rowStrings.size());
+
+ // Predicate to cover exactly both ranges.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0));
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 200));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 0));
+ assertEquals(0, rowStrings.size());
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testCreateTableCustomHashSchemasTwoMixedRanges() throws
Exception {
+ CreateTableOptions builder = getBasicCreateTableOptions();
+
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, 0);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 100);
+
+ // Simple custom hash schema for the range: two buckets on the column
"key".
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 2, 0);
+ builder.addRangePartition(rangePartition);
+ }
+
+ // Add table-wide schema as well.
+ builder.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+ // Add a range to have the table-wide hash schema.
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, 100);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 200);
+ builder.addRangePartition(lower, upper);
+ }
+
+ KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+ // Check the result: retrieve the information on tablets from master
+ // and check if each partition has expected parameters.
+ List<LocatedTablet> tablets = table.getTabletsLocations(10000);
+ // There should be 7 tablets: 2 for the [0, 100) range and 5 for [100,
200).
+ assertEquals(7, tablets.size());
+
+ List<String> expected = Lists.newArrayList();
+ expected.add("0 <= VALUES < 100");
+ expected.add("100 <= VALUES < 200");
+ assertEquals(
+ expected,
+ client.openTable(tableName).getFormattedRangePartitions(10000));
+
+ // Insert data into the newly created table and read it back.
+ KuduSession session = client.newSession();
+ for (int key = 0; key < 200; ++key) {
+ Insert insert = createBasicSchemaInsert(table, key);
+ session.apply(insert);
+ }
+ session.flush();
+
+ // Do full table scan.
+ List<String> rowStrings = scanTableToStrings(table);
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, -1));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 0));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 1));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 99));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 100));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 101));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 199));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 200));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
EQUAL, 201));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 0));
+ assertEquals(199, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 100));
+ assertEquals(99, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 199));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER, 200));
+ assertEquals(0, rowStrings.size());
+
+ // Predicate to have all rows in the range with table-wide hash schema.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 100));
+ assertEquals(100, rowStrings.size());
+
+ // Predicate to have all rows in the range with custom hash schema.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 100),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(100, rowStrings.size());
+
+ // Predicate to have one part of the rows in the range with table-wide hash
+ // schema, and the other part from the range with custom hash schema.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 50),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 150));
+ assertEquals(100, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 150));
+ assertEquals(150, rowStrings.size());
+
+ // Predicates to almost cover the both ranges (sort of off-by-one
situation).
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 1),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 199));
+ assertEquals(198, rowStrings.size());
+
+ // Predicates to almost cover the both ranges (sort of off-by-one
situation).
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 1),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(199, rowStrings.size());
+
+ // Predicates to almost cover the both ranges (sort of off-by-one
situation).
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 199));
+ assertEquals(199, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 199));
+ assertEquals(1, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS_EQUAL, 0));
+ assertEquals(1, rowStrings.size());
+
+ // Predicate to cover exactly both ranges.
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0),
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 200));
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 0));
+ assertEquals(200, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
GREATER_EQUAL, 200));
+ assertEquals(0, rowStrings.size());
+
+ rowStrings = scanTableToStrings(table,
+ KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"),
LESS, 0));
+ assertEquals(0, rowStrings.size());
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testCreateTableCustomHashSchemaDifferentDimensions() throws
Exception {
+ // Have the table-wide hash schema different from custom hash schema per
+ // various ranges: it should not be possible to create a table.
+ ArrayList<ColumnSchema> columns = new ArrayList<>(3);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c0i",
Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1i",
Type.INT64).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c2s",
Type.STRING).key(true).build());
+ final Schema schema = new Schema(columns);
+
+ CreateTableOptions builder = new
CreateTableOptions().setRangePartitionColumns(
+ ImmutableList.of("c0i"));
+
+ // Add table-wide schema with two hash dimensions.
+ builder.addHashPartitions(ImmutableList.of("c1i"), 7, 0);
+ builder.addHashPartitions(ImmutableList.of("c2s"), 3, 0);
+
+ // Simple custom hash schema for the range: two buckets on the column
"key".
+ {
+ PartialRow lower = schema.newPartialRow();
+ lower.addInt(0, -100);
+ PartialRow upper = schema.newPartialRow();
+ upper.addInt(0, 200);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("c0i"), 2, 0);
+ builder.addRangePartition(rangePartition);
+ }
+
+ try {
+ client.createTable(tableName, schema, builder);
+ fail("shouldn't be able to create a table with hash schemas varying in "
+
+ "number of hash dimensions across table partitions");
+ } catch (KuduException ex) {
+ assertTrue(ex.getStatus().isNotSupported());
+ final String errmsg = ex.getMessage();
+ assertTrue(errmsg, errmsg.matches(
+ "varying number of hash dimensions per range is not yet supported"));
+ }
+
+ // OK, now try a mixed case: one range with hash schema matching the number
+ // of hash dimensions of the table-wide hash schema, and a few more ranges
+ // with different number of hash dimensions in their hash schema.
+ // Simple custom hash schema for the range: two buckets on the column
"key".
+ {
+ PartialRow lower = schema.newPartialRow();
+ lower.addInt(0, 200);
+ PartialRow upper = schema.newPartialRow();
+ upper.addInt(0, 300);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("c0i"), 2, 0);
+ rangePartition.addHashPartitions(ImmutableList.of("c1i"), 3, 0);
+ builder.addRangePartition(rangePartition);
+ }
+
+ try {
+ client.createTable(tableName, schema, builder);
+ fail("shouldn't be able to create a table with hash schemas varying in "
+
+ "number of hash dimensions across table partitions");
+ } catch (KuduException ex) {
+ assertTrue(ex.getStatus().isNotSupported());
+ final String errmsg = ex.getMessage();
+ assertTrue(errmsg, errmsg.matches(
+ "varying number of hash dimensions per range is not yet supported"));
+ }
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testGetHashSchemaForRange() throws Exception {
+ final int valLower = 100;
+ final int valUpper = 200;
+
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, valLower);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, valUpper);
+
+ // Custom hash schema for the range: three buckets on the column "key".
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 3, 0);
+
+ CreateTableOptions builder = getBasicCreateTableOptions();
+ builder.addRangePartition(rangePartition);
+
+ // Add table-wide schema with one dimensions and five buckets.
+ builder.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+ final KuduTable table = client.createTable(tableName, basicSchema,
builder);
+ final PartitionSchema ps = table.getPartitionSchema();
+
+ // Should get the table-wide schema as the result when asking for a point
+ // in a non-covered range.
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 99);
+
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(5, s.get(0).getNumBuckets());
+ }
+
+ // The exact range boundary: should get the custom hash schema.
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 100);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(3, s.get(0).getNumBuckets());
+ }
+
+ // A value within the range: should get the custom hash schema.
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 101);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(3, s.get(0).getNumBuckets());
+ }
+
+ // Should get the table-wide schema as the result when asking for the
+ // upper exclusive boundary.
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 200);
+
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(5, s.get(0).getNumBuckets());
+ }
+
+ // Should get the table-wide schema as the result when asking for a point
+ // in a non-covered range.
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 300);
+
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ // There should be just one dimension with two buckets.
+ assertEquals(1, s.size());
+ assertEquals(5, s.get(0).getNumBuckets());
+ }
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testGetHashSchemaForRangeUnbounded() throws Exception {
+ // The test table is created with the following ranges:
+ // (-inf, -100) [-100, 0) [0, 100), [100, +inf)
+
+ CreateTableOptions builder = getBasicCreateTableOptions();
+ // Add table-wide schema with one dimensions and two buckets.
+ builder.addHashPartitions(ImmutableList.of("key"), 2, 0);
+
+ // Add range partition with custom hash schema: (-inf, -100)
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, -100);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 3, 0);
+
+ builder.addRangePartition(rangePartition);
+ }
+
+ // Add range partition with table-wide hash schema: [-100, 0)
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, -100);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 0);
+
+ builder.addRangePartition(lower, upper);
+ }
+
+ // Add range partition with custom hash schema: [0, 100)
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, 0);
+ PartialRow upper = basicSchema.newPartialRow();
+ upper.addInt(0, 100);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+ builder.addRangePartition(rangePartition);
+ }
+
+ // Add range partition with table-wide hash schema: [100, +inf)
+ {
+ PartialRow lower = basicSchema.newPartialRow();
+ lower.addInt(0, 100);
+ PartialRow upper = basicSchema.newPartialRow();
+
+ builder.addRangePartition(lower, upper);
+ }
+
+ final KuduTable table = client.createTable(tableName, basicSchema,
builder);
+ final PartitionSchema ps = table.getPartitionSchema();
+
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, -2002);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(3, s.get(0).getNumBuckets());
+ }
+
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, -101);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(3, s.get(0).getNumBuckets());
+ }
+
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, -100);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(2, s.get(0).getNumBuckets());
+ }
+
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 0);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(5, s.get(0).getNumBuckets());
+ }
+
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 99);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(5, s.get(0).getNumBuckets());
+ }
+
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 100);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(2, s.get(0).getNumBuckets());
+ }
+
+ {
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, 1001);
+ final List<PartitionSchema.HashBucketSchema> s =
ps.getHashSchemaForRange(
+ KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+ assertEquals(1, s.size());
+ assertEquals(2, s.get(0).getNumBuckets());
+ }
+ }
+
@Test(timeout = 100000)
public void testFormatRangePartitionsCompoundColumns() throws Exception {
ArrayList<ColumnSchema> columns = new ArrayList<>();
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
index 8b6e0d792..5cb2c15a6 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
@@ -198,7 +198,7 @@ public class TestOperation {
lower, upper, RangePartitionBound.INCLUSIVE_BOUND,
RangePartitionBound.EXCLUSIVE_BOUND);
Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
- List<CreateTableOptions.RangePartition> decoded =
+ List<RangePartition> decoded =
dec.decodeRangePartitions(encoded, schema);
assertEquals(1, decoded.size());
assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
@@ -241,8 +241,7 @@ public class TestOperation {
lower, upper, RangePartitionBound.INCLUSIVE_BOUND,
RangePartitionBound.EXCLUSIVE_BOUND);
Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
- List<CreateTableOptions.RangePartition> decoded =
- dec.decodeRangePartitions(encoded, schema);
+ List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
assertEquals(1, decoded.size());
assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
decoded.get(0).getLowerBoundType());
@@ -294,8 +293,7 @@ public class TestOperation {
lower, upper, RangePartitionBound.INCLUSIVE_BOUND,
RangePartitionBound.EXCLUSIVE_BOUND);
Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
- List<CreateTableOptions.RangePartition> decoded =
- dec.decodeRangePartitions(encoded, schema);
+ List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
assertEquals(1, decoded.size());
assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
decoded.get(0).getLowerBoundType());
@@ -338,14 +336,14 @@ public class TestOperation {
columns.add(new ColumnSchema.ColumnSchemaBuilder("c1",
Type.INT64).build());
final Schema schema = new Schema(columns);
- List<CreateTableOptions.RangePartition> rangePartitions = new
ArrayList<>();
+ List<RangePartition> rangePartitions = new ArrayList<>();
{
final PartialRow lower = schema.newPartialRow();
lower.addInt("c0", 0);
final PartialRow upper = schema.newPartialRow();
upper.addInt("c0", 100);
- rangePartitions.add(new CreateTableOptions.RangePartition(
+ rangePartitions.add(new RangePartition(
lower,
upper,
RangePartitionBound.INCLUSIVE_BOUND,
@@ -357,7 +355,7 @@ public class TestOperation {
final PartialRow upper = schema.newPartialRow();
upper.addInt("c0", 300);
- rangePartitions.add(new CreateTableOptions.RangePartition(
+ rangePartitions.add(new RangePartition(
lower,
upper,
RangePartitionBound.EXCLUSIVE_BOUND,
@@ -369,8 +367,7 @@ public class TestOperation {
rangePartitions, ImmutableList.of());
Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
- List<CreateTableOptions.RangePartition> decoded =
- dec.decodeRangePartitions(encoded, schema);
+ List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
assertEquals(2, decoded.size());
assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
@@ -417,7 +414,7 @@ public class TestOperation {
columns.add(new ColumnSchema.ColumnSchemaBuilder("c3",
Type.STRING).nullable(true).build());
final Schema schema = new Schema(columns);
- List<CreateTableOptions.RangePartition> rangePartitions = new
ArrayList<>();
+ List<RangePartition> rangePartitions = new ArrayList<>();
{
final PartialRow lower = schema.newPartialRow();
lower.addInt("c0", 0);
@@ -426,7 +423,7 @@ public class TestOperation {
final PartialRow upper = schema.newPartialRow();
upper.addInt("c0", 100);
upper.addString("c1", "c");
- rangePartitions.add(new CreateTableOptions.RangePartition(
+ rangePartitions.add(new RangePartition(
lower,
upper,
RangePartitionBound.INCLUSIVE_BOUND,
@@ -440,7 +437,7 @@ public class TestOperation {
final PartialRow upper = schema.newPartialRow();
upper.addInt("c0", 300);
upper.addString("c1", "f");
- rangePartitions.add(new CreateTableOptions.RangePartition(
+ rangePartitions.add(new RangePartition(
lower,
upper,
RangePartitionBound.EXCLUSIVE_BOUND,
@@ -452,8 +449,7 @@ public class TestOperation {
rangePartitions, ImmutableList.of());
Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
- List<CreateTableOptions.RangePartition> decoded =
- dec.decodeRangePartitions(encoded, schema);
+ List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
assertEquals(2, decoded.size());
assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
index 4bc01b294..60e576a4a 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
@@ -17,6 +17,7 @@
package org.apache.kudu.client;
+import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
@@ -876,4 +877,222 @@ public class TestPartitionPruner {
checkPartitions(0, 0, table, partitions,
KuduPredicate.newIsNullPredicate(timestamp));
}
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.MasterServerConfig(flags = {
+ "--enable_per_range_hash_schemas=true",
+ })
+ public void testPruningWithCustomHashSchemas() throws Exception {
+ // CREATE TABLE timeseries
+ // (host STRING, metric STRING, timestamp UNIXTIME_MICROS, value DOUBLE)
+ // PRIMARY KEY (host, metric, timestamp)
+ //
+ // RANGE(timestamp)
+ // (PARTITION VALUES >= 0,
+ // PARTITION VALUES < 100)
+ // HASH (host, metric) 2 PARTITIONS,
+ //
+ // RANGE(timestamp)
+ // (PARTITION VALUES >= 100,
+ // PARTITION VALUES < 200)
+ // HASH (host) 5 PARTITIONS
+
+ ColumnSchema host =
+ new ColumnSchema.ColumnSchemaBuilder("host",
Type.STRING).key(true).build();
+ ColumnSchema metric =
+ new ColumnSchema.ColumnSchemaBuilder("metric",
Type.STRING).key(true).build();
+ ColumnSchema timestamp =
+ new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS)
+ .key(true).build();
+ ColumnSchema value = new ColumnSchema.ColumnSchemaBuilder("value",
Type.DOUBLE).build();
+ final Schema schema = new Schema(ImmutableList.of(host, metric, timestamp,
value));
+
+ CreateTableOptions tableBuilder = new CreateTableOptions();
+ tableBuilder.setRangePartitionColumns(ImmutableList.of("timestamp"));
+
+ // Add range partition with table-wide hash schema.
+ {
+ PartialRow lower = schema.newPartialRow();
+ lower.addLong("timestamp", 0);
+ PartialRow upper = schema.newPartialRow();
+ upper.addLong("timestamp", 100);
+ tableBuilder.addRangePartition(lower, upper);
+ }
+
+ // Add range partition with custom hash schema.
+ {
+ PartialRow lower = schema.newPartialRow();
+ lower.addLong("timestamp", 100);
+ PartialRow upper = schema.newPartialRow();
+ upper.addLong("timestamp", 200);
+
+ RangePartitionWithCustomHashSchema rangePartition =
+ new RangePartitionWithCustomHashSchema(
+ lower,
+ upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ rangePartition.addHashPartitions(ImmutableList.of("host"), 5, 0);
+
+ tableBuilder.addRangePartition(rangePartition);
+ }
+
+ // Add table-wide hash schema.
+ tableBuilder.addHashPartitions(ImmutableList.of("host", "metric"), 2);
+
+ String tableName = "testPruningCHS";
+ client.createTable(tableName, schema, tableBuilder);
+ KuduTable table = client.openTable(tableName);
+
+ final List<Partition> partitions = getTablePartitions(table);
+ assertEquals(7, partitions.size());
+
+ // No Predicates
+ checkPartitions(7, 9, table, partitions);
+
+ checkPartitions(7, 7, table, partitions,
+ KuduPredicate.newComparisonPredicate(timestamp,
ComparisonOp.GREATER_EQUAL, 50),
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS,
150));
+
+ // host = "a"
+ // 2 tablets from the HASH(host, metric) range and 1 from the HASH(host)
range.
+ checkPartitions(3, 5, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"));
+
+ // host = "a"
+ // metric = "a"
+ checkPartitions(2, 3, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp >= 9;
+ checkPartitions(2, 3, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp,
ComparisonOp.GREATER_EQUAL, 9));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp >= 10;
+ // timestamp < 20;
+ checkPartitions(1, 1, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp,
ComparisonOp.GREATER_EQUAL, 10),
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS,
20));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp >= 100;
+ // timestamp < 200;
+ checkPartitions(1, 1, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp,
ComparisonOp.GREATER_EQUAL, 10),
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS,
20));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp < 10;
+ checkPartitions(1, 1, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS,
10));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp < 100;
+ checkPartitions(1, 1, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS,
100));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp >= 10;
+ checkPartitions(2, 3, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp,
ComparisonOp.GREATER_EQUAL, 10));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp >= 100;
+ checkPartitions(1, 2, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp,
ComparisonOp.GREATER_EQUAL, 100));
+
+ // host = "a"
+ // metric = "a"
+ // timestamp = 100;
+ checkPartitions(1, 1, table, partitions,
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL,
100));
+
+ final byte[] hash1 = new byte[] { 0, 0, 0, 1 };
+
+ // partition key < (hash=1)
+ // scan partitions: 1 + 1 + 1
+ checkPartitions(2, 3, table, partitions, null, hash1);
+
+ // partition key >= (hash=1)
+ // scan partitions: 1 + 4 + 1
+ checkPartitions(5, 6, table, partitions, hash1, null);
+
+ // timestamp = 10
+ // partition key < (hash=1)
+ // scan partitions: 0 + 1 + 0
+ checkPartitions(1, 1, table, partitions, null, hash1,
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL,
10));
+
+ // timestamp = 10
+ // partition key >= (hash=1)
+ checkPartitions(1, 1, table, partitions, hash1, null,
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL,
10));
+
+ // timestamp = 100
+ // partition key >= (hash=1)
+ checkPartitions(4, 4, table, partitions, hash1, null,
+ KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL,
100));
+
+ // timestamp IN (99, 100)
+ // host = "a"
+ // metric IN ("foo", "baz")
+ checkPartitions(2, 2, table, partitions,
+ KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(99L,
100L)),
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newInListPredicate(metric, ImmutableList.of("foo",
"baz")));
+
+ // timestamp IN (100, 199)
+ // host = "a"
+ // metric IN ("foo", "baz")
+ checkPartitions(1, 1, table, partitions,
+ KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(100L,
199L)),
+ KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+ KuduPredicate.newInListPredicate(metric, ImmutableList.of("foo",
"baz")));
+
+ // timestamp IN (0, 10)
+ checkPartitions(2, 2, table, partitions,
+ KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(0L,
10L)));
+
+ // timestamp IN (100, 110)
+ checkPartitions(5, 5, table, partitions,
+ KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(100L,
110L)));
+
+ // timestamp IN (99, 100)
+ checkPartitions(7, 7, table, partitions,
+ KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(99L,
100L)));
+
+ // timestamp IS NOT NULL
+ checkPartitions(7, 9, table, partitions,
+ KuduPredicate.newIsNotNullPredicate(timestamp));
+
+ // timestamp IS NULL
+ checkPartitions(0, 0, table, partitions,
+ KuduPredicate.newIsNullPredicate(timestamp));
+ }
}