This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 483738566 KUDU-2671: Backup/restore for custom hash schema
483738566 is described below

commit 483738566d6879f01e85e6f5c96d9a379f6128d7
Author: Mahesh Reddy <[email protected]>
AuthorDate: Tue Jul 19 13:55:03 2022 -0700

    KUDU-2671: Backup/restore for custom hash schema
    
    This patch ensures that the ranges with custom
    hash schemas are also backed up when the partition
    schema is backed up.
    
    Change-Id: I7b8e0481e4e9bc0ac0e5fcbf085976325dd55b0d
    Reviewed-on: http://gerrit.cloudera.org:8080/18753
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../src/main/protobuf/backup.proto                 |  8 ++++
 .../org/apache/kudu/backup/TableMetadata.scala     | 49 +++++++++++++++++++++-
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 48 ++++++++++++++++++++-
 .../org/apache/kudu/client/PartitionSchema.java    |  2 +-
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala | 34 ++++++++++++++-
 5 files changed, 137 insertions(+), 4 deletions(-)

diff --git a/java/kudu-backup-common/src/main/protobuf/backup.proto 
b/java/kudu-backup-common/src/main/protobuf/backup.proto
index b0fa6f1df..0fa9baecd 100644
--- a/java/kudu-backup-common/src/main/protobuf/backup.proto
+++ b/java/kudu-backup-common/src/main/protobuf/backup.proto
@@ -89,11 +89,19 @@ message HashPartitionMetadataPB {
   int32 seed = 3;
 }
 
+// Maps to RangeWithHashSchema class.
+// The fields are effectively 1 to 1 mappings of those in RangeWithHashSchema.
+message RangeAndHashPartitionMetadataPB {
+  RangeBoundsMetadataPB bounds = 1;
+  repeated HashPartitionMetadataPB hash_partitions = 2;
+}
+
 // Maps to PartitionSchema class.
 // The fields are effectively 1 to 1 mappings of those in PartitionSchema.
 message PartitionSchemaMetadataPB {
   repeated HashPartitionMetadataPB hash_partitions = 1;
   RangePartitionMetadataPB range_partitions = 2;
+  repeated RangeAndHashPartitionMetadataPB range_and_hash_partitions = 3;
 }
 
 // Maps to Partition class.
diff --git 
a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
 
b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 3bd2accd6..501b12444 100644
--- 
a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ 
b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -39,6 +39,7 @@ import org.apache.kudu.Type
 import org.apache.kudu.client.KuduPartitioner.KuduPartitionerBuilder
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema
 import org.apache.kudu.client.PartitionSchema.RangeSchema
+import org.apache.kudu.client.PartitionSchema.RangeWithHashSchema
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 
@@ -121,13 +122,49 @@ object TableMetadata {
   private def getPartitionSchemaMetadata(table: KuduTable): 
PartitionSchemaMetadataPB = {
     val hashPartitions = getHashPartitionsMetadata(table)
     val rangePartitions = getRangePartitionMetadata(table)
+    val rangeAndHashPartitions = getRangeAndHashPartitionsMetadata(table)
     PartitionSchemaMetadataPB
       .newBuilder()
       .addAllHashPartitions(hashPartitions.asJava)
       .setRangePartitions(rangePartitions)
+      .addAllRangeAndHashPartitions(rangeAndHashPartitions.asJava)
       .build()
   }
 
+  private def getRangeAndHashPartitionsMetadata(
+      table: KuduTable): Seq[RangeAndHashPartitionMetadataPB] = {
+    val tableSchema = table.getSchema
+    val partitionSchema = table.getPartitionSchema
+    val rangeColumnNames = 
partitionSchema.getRangeSchema.getColumnIds.asScala.map { id =>
+      getColumnById(tableSchema, id).getName
+    }
+    partitionSchema.getRangesWithHashSchemas.asScala.map { rhs =>
+      val hashSchemas = rhs.hashSchemas.asScala.map { hs =>
+        val hashColumnNames = hs.getColumnIds.asScala.map { id =>
+          getColumnById(tableSchema, id).getName
+        }
+        HashPartitionMetadataPB
+          .newBuilder()
+          .addAllColumnNames(hashColumnNames.asJava)
+          .setNumBuckets(hs.getNumBuckets)
+          .setSeed(hs.getSeed)
+          .build()
+      }
+      val upperValues = getBoundValues(rhs.upperBound, rangeColumnNames, 
tableSchema)
+      val lowerValues = getBoundValues(rhs.lowerBound, rangeColumnNames, 
tableSchema)
+      val bounds = RangeBoundsMetadataPB
+        .newBuilder()
+        .addAllUpperBounds(upperValues.asJava)
+        .addAllLowerBounds(lowerValues.asJava)
+        .build()
+      RangeAndHashPartitionMetadataPB
+        .newBuilder()
+        .setBounds(bounds)
+        .addAllHashPartitions(hashSchemas.asJava)
+        .build()
+    }
+  }
+
   private def getHashPartitionsMetadata(table: KuduTable): 
Seq[HashPartitionMetadataPB] = {
     val tableSchema = table.getSchema
     val partitionSchema = table.getPartitionSchema
@@ -365,6 +402,16 @@ object TableMetadata {
       val colIds = hp.getColumnNamesList.asScala.map(colNameToId)
       new HashBucketSchema(colIds.asJava, hp.getNumBuckets, hp.getSeed)
     }
-    new PartitionSchema(rangeSchema, hashSchemas.asJava, schema)
+    val rangesWithHashSchemas =
+      metadata.getPartitions.getRangeAndHashPartitionsList.asScala.map { rhp =>
+        val rangeHashSchemas = rhp.getHashPartitionsList.asScala.map { hp =>
+          val colIds = hp.getColumnNamesList.asScala.map(colNameToId)
+          new HashBucketSchema(colIds.asJava, hp.getNumBuckets, hp.getSeed)
+        }
+        val lower = getPartialRow(rhp.getBounds.getLowerBoundsList.asScala, 
schema)
+        val upper = getPartialRow(rhp.getBounds.getUpperBoundsList.asScala, 
schema)
+        new RangeWithHashSchema(lower, upper, rangeHashSchemas.asJava)
+      }
+    new PartitionSchema(rangeSchema, hashSchemas.asJava, 
rangesWithHashSchemas.asJava, schema)
   }
 }
diff --git 
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala 
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 21776225f..40d00c794 100644
--- 
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ 
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -19,7 +19,6 @@ package org.apache.kudu.backup
 import java.nio.file.Files
 import java.nio.file.Path
 import java.util
-
 import com.google.common.base.Objects
 import org.apache.commons.io.FileUtils
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema
@@ -590,6 +589,38 @@ class TestKuduBackup extends KuduTestSuite {
     restoreAndValidateTable(tableName, 50)
   }
 
+  @Test
+  def testTableWithCustomHashSchemas(): Unit = {
+    // Create the initial table and load it with data.
+    val tableName = "testTableWithCustomHashSchemas"
+    var table = kuduClient.createTable(tableName, schema, 
tableOptionsWithCustomHashSchema)
+    insertRows(table, 100)
+
+    // Run and validate initial backup.
+    backupAndValidateTable(tableName, 100, false)
+
+    // Rename the table and insert more rows
+    val newTableName = "impala::default.testTableWithCustomHashSchemas"
+    kuduClient.alterTable(tableName, new 
AlterTableOptions().renameTable(newTableName))
+    table = kuduClient.openTable(newTableName)
+    insertRows(table, 100, 100)
+
+    // Run and validate an incremental backup.
+    backupAndValidateTable(newTableName, 100, true)
+
+    // Create a new table with the old name.
+    val tableWithOldName =
+      kuduClient.createTable(tableName, schema, 
tableOptionsWithCustomHashSchema)
+    insertRows(tableWithOldName, 50)
+
+    // Backup the table with the old name.
+    backupAndValidateTable(tableName, 50, false)
+
+    // Restore the tables and check the row counts.
+    restoreAndValidateTable(newTableName, 200)
+    restoreAndValidateTable(tableName, 50)
+  }
+
   @Test
   def testTableNameChangeFlags() {
     // Create four tables and load data
@@ -893,6 +924,21 @@ class TestKuduBackup extends KuduTestSuite {
     val hashBucketsMatch = (0 until beforeBuckets.size).forall { i =>
       HashBucketSchemasMatch(beforeBuckets(i), afterBuckets(i))
     }
+    val beforeRangeHashSchemas = before.getRangesWithHashSchemas.asScala
+    val afterRangeHashSchemas = after.getRangesWithHashSchemas.asScala
+    if (beforeRangeHashSchemas.size != afterRangeHashSchemas.size) return false
+    for (i <- 0 until beforeRangeHashSchemas.size) {
+      val beforeHashSchemas = beforeRangeHashSchemas(i).hashSchemas.asScala
+      val afterHashSchemas = afterRangeHashSchemas(i).hashSchemas.asScala
+      if (beforeHashSchemas.size != afterHashSchemas.size) return false
+      for (j <- 0 until beforeHashSchemas.size) {
+        if (!HashBucketSchemasMatch(beforeHashSchemas(j), 
afterHashSchemas(j))) return false
+      }
+      if (!Objects.equal(beforeRangeHashSchemas(i).lowerBound, 
afterRangeHashSchemas(i).lowerBound)
+        || !Objects
+          .equal(beforeRangeHashSchemas(i).upperBound, 
afterRangeHashSchemas(i).upperBound))
+        return false
+    }
     hashBucketsMatch &&
     Objects.equal(before.getRangeSchema.getColumnIds, 
after.getRangeSchema.getColumnIds)
   }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
index aa234110c..e62ac8dbf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
@@ -177,7 +177,7 @@ public class PartitionSchema {
     return hashBucketSchemas;
   }
 
-  List<RangeWithHashSchema> getRangesWithHashSchemas() {
+  public List<RangeWithHashSchema> getRangesWithHashSchemas() {
     return rangesWithHashSchemas;
   }
 
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index ebf41a419..fdecc2213 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -20,7 +20,6 @@ package org.apache.kudu.spark.kudu
 import java.math.BigDecimal
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Date
-
 import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
 import org.apache.spark.SparkConf
@@ -29,6 +28,8 @@ import 
org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
 import org.apache.kudu.client.CreateTableOptions
 import org.apache.kudu.client.KuduClient
 import org.apache.kudu.client.KuduTable
+import org.apache.kudu.client.RangePartitionBound
+import org.apache.kudu.client.RangePartitionWithCustomHashSchema
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.test.KuduTestHarness
@@ -120,6 +121,37 @@ trait KuduTestSuite {
       .setNumReplicas(1)
   }
 
+  val tableOptionsWithCustomHashSchema: CreateTableOptions = {
+    val bottom = schema.newPartialRow()
+    bottom.addInt("key", 0)
+    val middle = schema.newPartialRow()
+    middle.addInt("key", 50)
+    val top = schema.newPartialRow()
+    top.addInt("key", 200)
+
+    val columns = List("key").asJava
+    val partitionFirst = new RangePartitionWithCustomHashSchema(
+      bottom,
+      middle,
+      RangePartitionBound.INCLUSIVE_BOUND,
+      RangePartitionBound.EXCLUSIVE_BOUND)
+    partitionFirst.addHashPartitions(columns, 2, 0)
+    val partitionSecond = new RangePartitionWithCustomHashSchema(
+      middle,
+      top,
+      RangePartitionBound.INCLUSIVE_BOUND,
+      RangePartitionBound.EXCLUSIVE_BOUND)
+    partitionSecond.addHashPartitions(columns, 3, 0)
+
+    new CreateTableOptions()
+      .setRangePartitionColumns(columns)
+      .addRangePartition(partitionFirst)
+      .addRangePartition(partitionSecond)
+      .addHashPartitions(columns, 4, 0)
+      .setOwner(owner)
+      .setNumReplicas(1)
+  }
+
   val appID: String = new Date().toString + math
     .floor(math.random * 10E4)
     .toLong

Reply via email to