This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch branch-1.15.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 14a28e167630824c1eaeb7139e095e14106e5405 Author: Abhishek Chennaka <[email protected]> AuthorDate: Mon May 3 11:45:41 2021 -0400 [backup] KUDU-3183 Add --newDatabaseName and --removeImpalaPrefix options to restore job While Kudu does not have a notion of database, usually the full table name is stored as <database>.<tablename> on kudu side. (NOTE: database name is optional). Using the options in this patch users can change the existing database name of the table or add a new database name to the table i.e. the prefix to the '.' in the full tablename as well as remove impala prefix for the tables which are being restored. 1.--newDatabaseName : Use this option to specify the new database name for the restored table. This will overwrite any existing database and if there is no existing database, a new database will be added to the table name. Will not affect "impala::" prefix. E.g: Adding database name "newDB" to the tables impala::default.test, impala::test, default.test, test will result in the table names impala::newDB.test, impala::newDB.test, newDB.test, newDB.test . This will not affect the existing/source tables. 2.--removeImpalaPrefix : If enabled, this option will remove the “impala::” prefix, if present from the restored table names. This is advisable if tables are backed up in Kudu clusters without HMS integration and being restored to Kudu clusters with HMS integration. Change-Id: I65adcc1b3de0a8e1ac5b7f50a2d3a7036aa69421 Reviewed-on: http://gerrit.cloudera.org:8080/17388 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <[email protected]> (cherry picked from commit bd37d601d36bcf51c595baad52b5d24b5e20d684) Reviewed-on: http://gerrit.cloudera.org:8080/17513 Tested-by: Grant Henke <[email protected]> Reviewed-by: Bankim Bhavsar <[email protected]> --- .../scala/org/apache/kudu/backup/KuduRestore.scala | 36 ++++++++++- .../scala/org/apache/kudu/backup/Options.scala | 21 +++++++ .../org/apache/kudu/backup/TestKuduBackup.scala | 69 +++++++++++++++++++++- .../scala/org/apache/kudu/backup/TestOptions.scala | 4 ++ 4 files changed, 126 insertions(+), 4 deletions(-) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala index ef01a8d..92d0717 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala @@ -44,6 +44,40 @@ import scala.util.Try @InterfaceStability.Unstable object KuduRestore { val log: Logger = LoggerFactory.getLogger(getClass) + val ImpalaPrefix = "impala::" + + /** + * Returns the table name in which the data will be restored considering the flags removeImpalaPrefix, + * newDatabaseName and tableSuffix + */ + def getRestoreTableName(fullTableName: String, options: RestoreOptions): String = { + // Break the table down into prefix::databaseName.tableName + var prefix = "" + var databaseName = "" + var tableName = fullTableName + val hasImpalaPrefix = tableName.startsWith(ImpalaPrefix) + if (hasImpalaPrefix) { + prefix = ImpalaPrefix + tableName = tableName.substring(ImpalaPrefix.length) + } + val hasDatabase = tableName.contains(".") + if (hasDatabase) { + databaseName = tableName.substring(0, tableName.indexOf(".") + 1) + tableName = tableName.substring(tableName.indexOf(".") + 1) + } + + // If the user does not want the Impala prefix, drop it + if (options.removeImpalaPrefix) { + prefix = "" + } + + // If there is a databaseName specified by the user, use that + if (options.newDatabaseName.nonEmpty) { + databaseName = options.newDatabaseName.concat(".") + } + + s"${prefix}${databaseName}${tableName}${options.tableSuffix}" + } private def doRestore( tableName: String, @@ -58,7 +92,7 @@ object KuduRestore { val graph = backupMap(tableName) val restorePath = graph.restorePath val lastMetadata = restorePath.backups.last.metadata - val restoreName = s"${lastMetadata.getTableName}${options.tableSuffix}" + val restoreName = getRestoreTableName(lastMetadata.getTableName, options) val numJobs = restorePath.backups.size var currentJob = 1 restorePath.backups.foreach { backup => diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala index 2c9762a..5201e60 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala @@ -186,6 +186,8 @@ case class RestoreOptions( tables: Seq[String], rootPath: String, kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName, + removeImpalaPrefix: Boolean = RestoreOptions.DefaultRemoveImpalaPrefix, + newDatabaseName: String = "", tableSuffix: String = "", createTables: Boolean = RestoreOptions.DefaultCreateTables, timestampMs: Long = System.currentTimeMillis(), @@ -194,6 +196,7 @@ case class RestoreOptions( restoreOwner: Boolean = RestoreOptions.DefaultRestoreOwner) object RestoreOptions { + val DefaultRemoveImpalaPrefix: Boolean = false val DefaultCreateTables: Boolean = true val DefaultFailOnFirstError = false val DefaultNumParallelRestores = 1 @@ -221,6 +224,24 @@ object RestoreOptions { "already exist. Default: " + DefaultCreateTables) .optional() + opt[Boolean]("removeImpalaPrefix") + .action((v, o) => o.copy(removeImpalaPrefix = v)) + .text("If true, removes the \"impala::\" prefix, if present from the restored table names. This is " + + "advisable if backup was taken in a Kudu cluster without HMS sync and restoring to " + + "Kudu cluster which has HMS sync in place. Only used when createTables is true. Default: " + + DefaultRemoveImpalaPrefix) + .optional() + + opt[String]("newDatabaseName") + .action((v, o) => o.copy(newDatabaseName = v)) + .text( + "If set, replaces the existing database name and if there is no existing database name, a new database " + + "name is added. Setting this to an empty string will have the same effect of not using the flag at all. " + + "For example, if this is set to newdb for the tables testtable and impala::db.testtable the restored " + + "tables will have the names newdb.testtable and impala::newdb.testtable respectively, assuming " + + "removeImpalaPrefix is set to false") + .optional() + opt[String]("tableSuffix") .action((v, o) => o.copy(tableSuffix = v)) .text("If set, the suffix to add to the restored table names. Only used when " + diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index 5ecd468..2177622 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -591,6 +591,70 @@ class TestKuduBackup extends KuduTestSuite { } @Test + def testTableNameChangeFlags() { + // Create four tables and load data + val rowCount = 100 + val tableNameWithImpalaPrefix = "impala::oldDatabase.testTableWithImpalaPrefix" + val tableWithImpalaPrefix = + kuduClient.createTable(tableNameWithImpalaPrefix, schema, tableOptions) + val tableNameWithoutImpalaPrefix = "oldDatabase.testTableWithoutImpalaPrefix" + val tableWithoutImpalaPrefix = + kuduClient.createTable(tableNameWithoutImpalaPrefix, schema, tableOptions) + val tableNameWithImpalaPrefixWithoutDb = "impala::testTableWithImpalaPrefixWithoutDb" + val tableWithImpalaPrefixWithoutDb = + kuduClient.createTable(tableNameWithImpalaPrefixWithoutDb, schema, tableOptions) + val tableNameWithoutImpalaPrefixWithoutDb = "testTableWithoutImpalaPrefixWithoutDb" + val tableWithoutImpalaPrefixWithoutDb = + kuduClient.createTable(tableNameWithoutImpalaPrefixWithoutDb, schema, tableOptions) + insertRows(tableWithImpalaPrefix, rowCount) + insertRows(tableWithoutImpalaPrefix, rowCount) + insertRows(tableWithImpalaPrefixWithoutDb, rowCount) + insertRows(tableWithoutImpalaPrefixWithoutDb, rowCount) + + // Backup the four tables + backupAndValidateTable(tableNameWithImpalaPrefix, rowCount, false) + backupAndValidateTable(tableNameWithoutImpalaPrefix, rowCount, false) + backupAndValidateTable(tableNameWithImpalaPrefixWithoutDb, rowCount, false) + backupAndValidateTable(tableNameWithoutImpalaPrefixWithoutDb, rowCount, false) + + // Restore with removeImpalaPrefix = true and newDatabase = newDatabase and validate the tables + val withImpalaPrefix = + createRestoreOptions(Seq(tableNameWithImpalaPrefix)) + .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase") + assertTrue(runRestore(withImpalaPrefix)) + val rddWithImpalaPrefix = + kuduContext + .kuduRDD(ss.sparkContext, s"newDatabase.testTableWithImpalaPrefix-restore") + assertEquals(rddWithImpalaPrefix.collect.length, rowCount) + + val withoutImpalaPrefix = + createRestoreOptions(Seq(tableNameWithoutImpalaPrefix)) + .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase") + assertTrue(runRestore(withoutImpalaPrefix)) + val rddWithoutImpalaPrefix = + kuduContext.kuduRDD(ss.sparkContext, s"newDatabase.testTableWithoutImpalaPrefix-restore") + assertEquals(rddWithoutImpalaPrefix.collect.length, rowCount) + + val withImpalaPrefixWithoutDb = + createRestoreOptions(Seq(tableNameWithImpalaPrefixWithoutDb)) + .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase") + assertTrue(runRestore(withImpalaPrefixWithoutDb)) + val rddWithImpalaPrefixWithoutDb = + kuduContext + .kuduRDD(ss.sparkContext, s"newDatabase.testTableWithImpalaPrefixWithoutDb-restore") + assertEquals(rddWithImpalaPrefixWithoutDb.collect.length, rowCount) + + val withoutImpalaPrefixWithoutDb = + createRestoreOptions(Seq(tableNameWithoutImpalaPrefixWithoutDb)) + .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase") + assertTrue(runRestore(withoutImpalaPrefixWithoutDb)) + val rddWithoutImpalaPrefixWithoutDb = + kuduContext + .kuduRDD(ss.sparkContext, s"newDatabase.testTableWithoutImpalaPrefixWithoutDb-restore") + assertEquals(rddWithoutImpalaPrefixWithoutDb.collect.length, rowCount) + } + + @Test def testDeleteIgnore(): Unit = { doDeleteIgnoreTest() } @@ -763,9 +827,8 @@ class TestKuduBackup extends KuduTestSuite { def restoreAndValidateTable(tableName: String, expectedRowCount: Long) = { val options = createRestoreOptions(Seq(tableName)) assertTrue(runRestore(options)) - - // Verify the table data. - val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore") + val restoreTableName = KuduRestore.getRestoreTableName(tableName, options) + val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$restoreTableName") assertEquals(rdd.collect.length, expectedRowCount) } diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala index 0ce464c..b65bd38 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala @@ -53,6 +53,10 @@ class TestOptions extends KuduTestSuite { | --kuduMasterAddresses <value> | Comma-separated addresses of Kudu masters. Default: localhost | --createTables <value> If true, create the tables during restore. Set to false if the target tables already exist. Default: true + | --removeImpalaPrefix <value> + | If true, removes the "impala::" prefix, if present from the restored table names. This is advisable if backup was taken in a Kudu cluster without HMS sync and restoring to Kudu cluster which has HMS sync in place. Only used when createTables is true. Default: false + | --newDatabaseName <value> + | If set, replaces the existing database name and if there is no existing database name, a new database name is added. Setting this to an empty string will have the same effect of not using the flag at all. For example, if this is set to newdb for the tables testtable and impala::db.testtable the restored tables will have the names newdb.testtable and impala::newdb.testtable respectively, assuming removeImpalaPrefix is set to false | --tableSuffix <value> If set, the suffix to add to the restored table names. Only used when createTables is true. | --timestampMs <value> A UNIX timestamp in milliseconds that defines the latest time to use when selecting restore candidates. Default: `System.currentTimeMillis()` | --failOnFirstError Whether to fail the restore job as soon as a single table restore fails. Default: false
