This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new bd37d60  [backup] KUDU-3183 Add --newDatabaseName and 
--removeImpalaPrefix options to restore job
bd37d60 is described below

commit bd37d601d36bcf51c595baad52b5d24b5e20d684
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Mon May 3 11:45:41 2021 -0400

    [backup] KUDU-3183 Add --newDatabaseName and --removeImpalaPrefix
    options to restore job
    
    While Kudu does not have a notion of database, usually the full
    table name is stored as <database>.<tablename> on kudu side. (NOTE:
    database name is optional). Using the options in this patch users
    can change the existing database name of the table or add a new
    database name to the table i.e. the prefix to the '.' in the full
    tablename as well as remove impala prefix for the tables which
    are being restored.
    
    1.--newDatabaseName : Use this option to specify the new database
    name for the restored table. This will overwrite any existing
    database and if there is no existing database, a new database will
    be added to the table name. Will not affect "impala::" prefix.
    E.g: Adding database name "newDB" to the tables impala::default.test,
    impala::test, default.test, test will result in the table names
    impala::newDB.test, impala::newDB.test, newDB.test, newDB.test .
    This will not affect the existing/source tables.
    
    2.--removeImpalaPrefix : If enabled, this option will remove the
    “impala::” prefix, if present from the restored table names. This is
    advisable if tables are backed up in Kudu clusters without HMS
    integration and being restored to Kudu clusters with HMS integration.
    
    Change-Id: I65adcc1b3de0a8e1ac5b7f50a2d3a7036aa69421
    Reviewed-on: http://gerrit.cloudera.org:8080/17388
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <[email protected]>
---
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 36 ++++++++++-
 .../scala/org/apache/kudu/backup/Options.scala     | 21 +++++++
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 69 +++++++++++++++++++++-
 .../scala/org/apache/kudu/backup/TestOptions.scala |  4 ++
 4 files changed, 126 insertions(+), 4 deletions(-)

diff --git 
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala 
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index ef01a8d..92d0717 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -44,6 +44,40 @@ import scala.util.Try
 @InterfaceStability.Unstable
 object KuduRestore {
   val log: Logger = LoggerFactory.getLogger(getClass)
+  val ImpalaPrefix = "impala::"
+
+  /**
+   * Returns the table name in which the data will be restored considering the 
flags removeImpalaPrefix,
+   * newDatabaseName and tableSuffix
+   */
+  def getRestoreTableName(fullTableName: String, options: RestoreOptions): 
String = {
+    // Break the table down into prefix::databaseName.tableName
+    var prefix = ""
+    var databaseName = ""
+    var tableName = fullTableName
+    val hasImpalaPrefix = tableName.startsWith(ImpalaPrefix)
+    if (hasImpalaPrefix) {
+      prefix = ImpalaPrefix
+      tableName = tableName.substring(ImpalaPrefix.length)
+    }
+    val hasDatabase = tableName.contains(".")
+    if (hasDatabase) {
+      databaseName = tableName.substring(0, tableName.indexOf(".") + 1)
+      tableName = tableName.substring(tableName.indexOf(".") + 1)
+    }
+
+    // If the user does not want the Impala prefix, drop it
+    if (options.removeImpalaPrefix) {
+      prefix = ""
+    }
+
+    // If there is a databaseName specified by the user, use that
+    if (options.newDatabaseName.nonEmpty) {
+      databaseName = options.newDatabaseName.concat(".")
+    }
+
+    s"${prefix}${databaseName}${tableName}${options.tableSuffix}"
+  }
 
   private def doRestore(
       tableName: String,
@@ -58,7 +92,7 @@ object KuduRestore {
     val graph = backupMap(tableName)
     val restorePath = graph.restorePath
     val lastMetadata = restorePath.backups.last.metadata
-    val restoreName = s"${lastMetadata.getTableName}${options.tableSuffix}"
+    val restoreName = getRestoreTableName(lastMetadata.getTableName, options)
     val numJobs = restorePath.backups.size
     var currentJob = 1
     restorePath.backups.foreach { backup =>
diff --git 
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala 
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index 2c9762a..5201e60 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -186,6 +186,8 @@ case class RestoreOptions(
     tables: Seq[String],
     rootPath: String,
     kuduMasterAddresses: String = 
InetAddress.getLocalHost.getCanonicalHostName,
+    removeImpalaPrefix: Boolean = RestoreOptions.DefaultRemoveImpalaPrefix,
+    newDatabaseName: String = "",
     tableSuffix: String = "",
     createTables: Boolean = RestoreOptions.DefaultCreateTables,
     timestampMs: Long = System.currentTimeMillis(),
@@ -194,6 +196,7 @@ case class RestoreOptions(
     restoreOwner: Boolean = RestoreOptions.DefaultRestoreOwner)
 
 object RestoreOptions {
+  val DefaultRemoveImpalaPrefix: Boolean = false
   val DefaultCreateTables: Boolean = true
   val DefaultFailOnFirstError = false
   val DefaultNumParallelRestores = 1
@@ -221,6 +224,24 @@ object RestoreOptions {
           "already exist. Default: " + DefaultCreateTables)
         .optional()
 
+      opt[Boolean]("removeImpalaPrefix")
+        .action((v, o) => o.copy(removeImpalaPrefix = v))
+        .text("If true, removes the \"impala::\" prefix, if present from the 
restored table names. This is " +
+          "advisable if backup was taken in a Kudu cluster without HMS sync 
and restoring to " +
+          "Kudu cluster which has HMS sync in place. Only used when 
createTables is true. Default: " +
+          DefaultRemoveImpalaPrefix)
+        .optional()
+
+      opt[String]("newDatabaseName")
+        .action((v, o) => o.copy(newDatabaseName = v))
+        .text(
+          "If set, replaces the existing database name and if there is no 
existing database name, a new database " +
+            "name is added. Setting this to an empty string will have the same 
effect of not using the flag at all. " +
+            "For example, if this is set to newdb for the tables testtable and 
impala::db.testtable the restored " +
+            "tables will have the names newdb.testtable and 
impala::newdb.testtable respectively, assuming " +
+            "removeImpalaPrefix is set to false")
+        .optional()
+
       opt[String]("tableSuffix")
         .action((v, o) => o.copy(tableSuffix = v))
         .text("If set, the suffix to add to the restored table names. Only 
used when " +
diff --git 
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala 
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 5ecd468..2177622 100644
--- 
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ 
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -591,6 +591,70 @@ class TestKuduBackup extends KuduTestSuite {
   }
 
   @Test
+  def testTableNameChangeFlags() {
+    // Create four tables and load data
+    val rowCount = 100
+    val tableNameWithImpalaPrefix = 
"impala::oldDatabase.testTableWithImpalaPrefix"
+    val tableWithImpalaPrefix =
+      kuduClient.createTable(tableNameWithImpalaPrefix, schema, tableOptions)
+    val tableNameWithoutImpalaPrefix = 
"oldDatabase.testTableWithoutImpalaPrefix"
+    val tableWithoutImpalaPrefix =
+      kuduClient.createTable(tableNameWithoutImpalaPrefix, schema, 
tableOptions)
+    val tableNameWithImpalaPrefixWithoutDb = 
"impala::testTableWithImpalaPrefixWithoutDb"
+    val tableWithImpalaPrefixWithoutDb =
+      kuduClient.createTable(tableNameWithImpalaPrefixWithoutDb, schema, 
tableOptions)
+    val tableNameWithoutImpalaPrefixWithoutDb = 
"testTableWithoutImpalaPrefixWithoutDb"
+    val tableWithoutImpalaPrefixWithoutDb =
+      kuduClient.createTable(tableNameWithoutImpalaPrefixWithoutDb, schema, 
tableOptions)
+    insertRows(tableWithImpalaPrefix, rowCount)
+    insertRows(tableWithoutImpalaPrefix, rowCount)
+    insertRows(tableWithImpalaPrefixWithoutDb, rowCount)
+    insertRows(tableWithoutImpalaPrefixWithoutDb, rowCount)
+
+    // Backup the four tables
+    backupAndValidateTable(tableNameWithImpalaPrefix, rowCount, false)
+    backupAndValidateTable(tableNameWithoutImpalaPrefix, rowCount, false)
+    backupAndValidateTable(tableNameWithImpalaPrefixWithoutDb, rowCount, false)
+    backupAndValidateTable(tableNameWithoutImpalaPrefixWithoutDb, rowCount, 
false)
+
+    // Restore with removeImpalaPrefix = true and newDatabase = newDatabase 
and validate the tables
+    val withImpalaPrefix =
+      createRestoreOptions(Seq(tableNameWithImpalaPrefix))
+        .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+    assertTrue(runRestore(withImpalaPrefix))
+    val rddWithImpalaPrefix =
+      kuduContext
+        .kuduRDD(ss.sparkContext, 
s"newDatabase.testTableWithImpalaPrefix-restore")
+    assertEquals(rddWithImpalaPrefix.collect.length, rowCount)
+
+    val withoutImpalaPrefix =
+      createRestoreOptions(Seq(tableNameWithoutImpalaPrefix))
+        .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+    assertTrue(runRestore(withoutImpalaPrefix))
+    val rddWithoutImpalaPrefix =
+      kuduContext.kuduRDD(ss.sparkContext, 
s"newDatabase.testTableWithoutImpalaPrefix-restore")
+    assertEquals(rddWithoutImpalaPrefix.collect.length, rowCount)
+
+    val withImpalaPrefixWithoutDb =
+      createRestoreOptions(Seq(tableNameWithImpalaPrefixWithoutDb))
+        .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+    assertTrue(runRestore(withImpalaPrefixWithoutDb))
+    val rddWithImpalaPrefixWithoutDb =
+      kuduContext
+        .kuduRDD(ss.sparkContext, 
s"newDatabase.testTableWithImpalaPrefixWithoutDb-restore")
+    assertEquals(rddWithImpalaPrefixWithoutDb.collect.length, rowCount)
+
+    val withoutImpalaPrefixWithoutDb =
+      createRestoreOptions(Seq(tableNameWithoutImpalaPrefixWithoutDb))
+        .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+    assertTrue(runRestore(withoutImpalaPrefixWithoutDb))
+    val rddWithoutImpalaPrefixWithoutDb =
+      kuduContext
+        .kuduRDD(ss.sparkContext, 
s"newDatabase.testTableWithoutImpalaPrefixWithoutDb-restore")
+    assertEquals(rddWithoutImpalaPrefixWithoutDb.collect.length, rowCount)
+  }
+
+  @Test
   def testDeleteIgnore(): Unit = {
     doDeleteIgnoreTest()
   }
@@ -763,9 +827,8 @@ class TestKuduBackup extends KuduTestSuite {
   def restoreAndValidateTable(tableName: String, expectedRowCount: Long) = {
     val options = createRestoreOptions(Seq(tableName))
     assertTrue(runRestore(options))
-
-    // Verify the table data.
-    val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore")
+    val restoreTableName = KuduRestore.getRestoreTableName(tableName, options)
+    val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$restoreTableName")
     assertEquals(rdd.collect.length, expectedRowCount)
   }
 
diff --git 
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala 
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
index 0ce464c..b65bd38 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
@@ -53,6 +53,10 @@ class TestOptions extends KuduTestSuite {
         |  --kuduMasterAddresses <value>
         |                           Comma-separated addresses of Kudu masters. 
Default: localhost
         |  --createTables <value>   If true, create the tables during restore. 
Set to false if the target tables already exist. Default: true
+        |  --removeImpalaPrefix <value>
+        |                           If true, removes the "impala::" prefix, if 
present from the restored table names. This is advisable if backup was taken in 
a Kudu cluster without HMS sync and restoring to Kudu cluster which has HMS 
sync in place. Only used when createTables is true. Default: false
+        |  --newDatabaseName <value>
+        |                           If set, replaces the existing database 
name and if there is no existing database name, a new database name is added. 
Setting this to an empty string will have the same effect of not using the flag 
at all. For example, if this is set to newdb for the tables testtable and 
impala::db.testtable the restored tables will have the names newdb.testtable 
and impala::newdb.testtable respectively, assuming removeImpalaPrefix is set to 
false
         |  --tableSuffix <value>    If set, the suffix to add to the restored 
table names. Only used when createTables is true.
         |  --timestampMs <value>    A UNIX timestamp in milliseconds that 
defines the latest time to use when selecting restore candidates. Default: 
`System.currentTimeMillis()`
         |  --failOnFirstError       Whether to fail the restore job as soon as 
a single table restore fails. Default: false

Reply via email to