This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 483738566 KUDU-2671: Backup/restore for custom hash schema
483738566 is described below
commit 483738566d6879f01e85e6f5c96d9a379f6128d7
Author: Mahesh Reddy <[email protected]>
AuthorDate: Tue Jul 19 13:55:03 2022 -0700
KUDU-2671: Backup/restore for custom hash schema
This patch ensures that the ranges with custom
hash schemas are also backed up when the partition
schema is backed up.
Change-Id: I7b8e0481e4e9bc0ac0e5fcbf085976325dd55b0d
Reviewed-on: http://gerrit.cloudera.org:8080/18753
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
.../src/main/protobuf/backup.proto | 8 ++++
.../org/apache/kudu/backup/TableMetadata.scala | 49 +++++++++++++++++++++-
.../org/apache/kudu/backup/TestKuduBackup.scala | 48 ++++++++++++++++++++-
.../org/apache/kudu/client/PartitionSchema.java | 2 +-
.../org/apache/kudu/spark/kudu/KuduTestSuite.scala | 34 ++++++++++++++-
5 files changed, 137 insertions(+), 4 deletions(-)
diff --git a/java/kudu-backup-common/src/main/protobuf/backup.proto
b/java/kudu-backup-common/src/main/protobuf/backup.proto
index b0fa6f1df..0fa9baecd 100644
--- a/java/kudu-backup-common/src/main/protobuf/backup.proto
+++ b/java/kudu-backup-common/src/main/protobuf/backup.proto
@@ -89,11 +89,19 @@ message HashPartitionMetadataPB {
int32 seed = 3;
}
+// Maps to RangeWithHashSchema class.
+// The fields are effectively 1 to 1 mappings of those in RangeWithHashSchema.
+message RangeAndHashPartitionMetadataPB {
+ RangeBoundsMetadataPB bounds = 1;
+ repeated HashPartitionMetadataPB hash_partitions = 2;
+}
+
// Maps to PartitionSchema class.
// The fields are effectively 1 to 1 mappings of those in PartitionSchema.
message PartitionSchemaMetadataPB {
repeated HashPartitionMetadataPB hash_partitions = 1;
RangePartitionMetadataPB range_partitions = 2;
+ repeated RangeAndHashPartitionMetadataPB range_and_hash_partitions = 3;
}
// Maps to Partition class.
diff --git
a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 3bd2accd6..501b12444 100644
---
a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++
b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -39,6 +39,7 @@ import org.apache.kudu.Type
import org.apache.kudu.client.KuduPartitioner.KuduPartitionerBuilder
import org.apache.kudu.client.PartitionSchema.HashBucketSchema
import org.apache.kudu.client.PartitionSchema.RangeSchema
+import org.apache.kudu.client.PartitionSchema.RangeWithHashSchema
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
@@ -121,13 +122,49 @@ object TableMetadata {
private def getPartitionSchemaMetadata(table: KuduTable):
PartitionSchemaMetadataPB = {
val hashPartitions = getHashPartitionsMetadata(table)
val rangePartitions = getRangePartitionMetadata(table)
+ val rangeAndHashPartitions = getRangeAndHashPartitionsMetadata(table)
PartitionSchemaMetadataPB
.newBuilder()
.addAllHashPartitions(hashPartitions.asJava)
.setRangePartitions(rangePartitions)
+ .addAllRangeAndHashPartitions(rangeAndHashPartitions.asJava)
.build()
}
+ private def getRangeAndHashPartitionsMetadata(
+ table: KuduTable): Seq[RangeAndHashPartitionMetadataPB] = {
+ val tableSchema = table.getSchema
+ val partitionSchema = table.getPartitionSchema
+ val rangeColumnNames =
partitionSchema.getRangeSchema.getColumnIds.asScala.map { id =>
+ getColumnById(tableSchema, id).getName
+ }
+ partitionSchema.getRangesWithHashSchemas.asScala.map { rhs =>
+ val hashSchemas = rhs.hashSchemas.asScala.map { hs =>
+ val hashColumnNames = hs.getColumnIds.asScala.map { id =>
+ getColumnById(tableSchema, id).getName
+ }
+ HashPartitionMetadataPB
+ .newBuilder()
+ .addAllColumnNames(hashColumnNames.asJava)
+ .setNumBuckets(hs.getNumBuckets)
+ .setSeed(hs.getSeed)
+ .build()
+ }
+ val upperValues = getBoundValues(rhs.upperBound, rangeColumnNames,
tableSchema)
+ val lowerValues = getBoundValues(rhs.lowerBound, rangeColumnNames,
tableSchema)
+ val bounds = RangeBoundsMetadataPB
+ .newBuilder()
+ .addAllUpperBounds(upperValues.asJava)
+ .addAllLowerBounds(lowerValues.asJava)
+ .build()
+ RangeAndHashPartitionMetadataPB
+ .newBuilder()
+ .setBounds(bounds)
+ .addAllHashPartitions(hashSchemas.asJava)
+ .build()
+ }
+ }
+
private def getHashPartitionsMetadata(table: KuduTable):
Seq[HashPartitionMetadataPB] = {
val tableSchema = table.getSchema
val partitionSchema = table.getPartitionSchema
@@ -365,6 +402,16 @@ object TableMetadata {
val colIds = hp.getColumnNamesList.asScala.map(colNameToId)
new HashBucketSchema(colIds.asJava, hp.getNumBuckets, hp.getSeed)
}
- new PartitionSchema(rangeSchema, hashSchemas.asJava, schema)
+ val rangesWithHashSchemas =
+ metadata.getPartitions.getRangeAndHashPartitionsList.asScala.map { rhp =>
+ val rangeHashSchemas = rhp.getHashPartitionsList.asScala.map { hp =>
+ val colIds = hp.getColumnNamesList.asScala.map(colNameToId)
+ new HashBucketSchema(colIds.asJava, hp.getNumBuckets, hp.getSeed)
+ }
+ val lower = getPartialRow(rhp.getBounds.getLowerBoundsList.asScala,
schema)
+ val upper = getPartialRow(rhp.getBounds.getUpperBoundsList.asScala,
schema)
+ new RangeWithHashSchema(lower, upper, rangeHashSchemas.asJava)
+ }
+ new PartitionSchema(rangeSchema, hashSchemas.asJava,
rangesWithHashSchemas.asJava, schema)
}
}
diff --git
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 21776225f..40d00c794 100644
---
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -19,7 +19,6 @@ package org.apache.kudu.backup
import java.nio.file.Files
import java.nio.file.Path
import java.util
-
import com.google.common.base.Objects
import org.apache.commons.io.FileUtils
import org.apache.kudu.client.PartitionSchema.HashBucketSchema
@@ -590,6 +589,38 @@ class TestKuduBackup extends KuduTestSuite {
restoreAndValidateTable(tableName, 50)
}
+ @Test
+ def testTableWithCustomHashSchemas(): Unit = {
+ // Create the initial table and load it with data.
+ val tableName = "testTableWithCustomHashSchemas"
+ var table = kuduClient.createTable(tableName, schema,
tableOptionsWithCustomHashSchema)
+ insertRows(table, 100)
+
+ // Run and validate initial backup.
+ backupAndValidateTable(tableName, 100, false)
+
+ // Rename the table and insert more rows
+ val newTableName = "impala::default.testTableWithCustomHashSchemas"
+ kuduClient.alterTable(tableName, new
AlterTableOptions().renameTable(newTableName))
+ table = kuduClient.openTable(newTableName)
+ insertRows(table, 100, 100)
+
+ // Run and validate an incremental backup.
+ backupAndValidateTable(newTableName, 100, true)
+
+ // Create a new table with the old name.
+ val tableWithOldName =
+ kuduClient.createTable(tableName, schema,
tableOptionsWithCustomHashSchema)
+ insertRows(tableWithOldName, 50)
+
+ // Backup the table with the old name.
+ backupAndValidateTable(tableName, 50, false)
+
+ // Restore the tables and check the row counts.
+ restoreAndValidateTable(newTableName, 200)
+ restoreAndValidateTable(tableName, 50)
+ }
+
@Test
def testTableNameChangeFlags() {
// Create four tables and load data
@@ -893,6 +924,21 @@ class TestKuduBackup extends KuduTestSuite {
val hashBucketsMatch = (0 until beforeBuckets.size).forall { i =>
HashBucketSchemasMatch(beforeBuckets(i), afterBuckets(i))
}
+ val beforeRangeHashSchemas = before.getRangesWithHashSchemas.asScala
+ val afterRangeHashSchemas = after.getRangesWithHashSchemas.asScala
+ if (beforeRangeHashSchemas.size != afterRangeHashSchemas.size) return false
+ for (i <- 0 until beforeRangeHashSchemas.size) {
+ val beforeHashSchemas = beforeRangeHashSchemas(i).hashSchemas.asScala
+ val afterHashSchemas = afterRangeHashSchemas(i).hashSchemas.asScala
+ if (beforeHashSchemas.size != afterHashSchemas.size) return false
+ for (j <- 0 until beforeHashSchemas.size) {
+ if (!HashBucketSchemasMatch(beforeHashSchemas(j),
afterHashSchemas(j))) return false
+ }
+ if (!Objects.equal(beforeRangeHashSchemas(i).lowerBound,
afterRangeHashSchemas(i).lowerBound)
+ || !Objects
+ .equal(beforeRangeHashSchemas(i).upperBound,
afterRangeHashSchemas(i).upperBound))
+ return false
+ }
hashBucketsMatch &&
Objects.equal(before.getRangeSchema.getColumnIds,
after.getRangeSchema.getColumnIds)
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
index aa234110c..e62ac8dbf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
@@ -177,7 +177,7 @@ public class PartitionSchema {
return hashBucketSchemas;
}
- List<RangeWithHashSchema> getRangesWithHashSchemas() {
+ public List<RangeWithHashSchema> getRangesWithHashSchemas() {
return rangesWithHashSchemas;
}
diff --git
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index ebf41a419..fdecc2213 100644
---
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -20,7 +20,6 @@ package org.apache.kudu.spark.kudu
import java.math.BigDecimal
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Date
-
import scala.collection.JavaConverters._
import scala.collection.immutable.IndexedSeq
import org.apache.spark.SparkConf
@@ -29,6 +28,8 @@ import
org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.KuduClient
import org.apache.kudu.client.KuduTable
+import org.apache.kudu.client.RangePartitionBound
+import org.apache.kudu.client.RangePartitionWithCustomHashSchema
import org.apache.kudu.Schema
import org.apache.kudu.Type
import org.apache.kudu.test.KuduTestHarness
@@ -120,6 +121,37 @@ trait KuduTestSuite {
.setNumReplicas(1)
}
+ val tableOptionsWithCustomHashSchema: CreateTableOptions = {
+ val bottom = schema.newPartialRow()
+ bottom.addInt("key", 0)
+ val middle = schema.newPartialRow()
+ middle.addInt("key", 50)
+ val top = schema.newPartialRow()
+ top.addInt("key", 200)
+
+ val columns = List("key").asJava
+ val partitionFirst = new RangePartitionWithCustomHashSchema(
+ bottom,
+ middle,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND)
+ partitionFirst.addHashPartitions(columns, 2, 0)
+ val partitionSecond = new RangePartitionWithCustomHashSchema(
+ middle,
+ top,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND)
+ partitionSecond.addHashPartitions(columns, 3, 0)
+
+ new CreateTableOptions()
+ .setRangePartitionColumns(columns)
+ .addRangePartition(partitionFirst)
+ .addRangePartition(partitionSecond)
+ .addHashPartitions(columns, 4, 0)
+ .setOwner(owner)
+ .setNumReplicas(1)
+ }
+
val appID: String = new Date().toString + math
.floor(math.random * 10E4)
.toLong