This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new bd37d60 [backup] KUDU-3183 Add --newDatabaseName and
--removeImpalaPrefix options to restore job
bd37d60 is described below
commit bd37d601d36bcf51c595baad52b5d24b5e20d684
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Mon May 3 11:45:41 2021 -0400
[backup] KUDU-3183 Add --newDatabaseName and --removeImpalaPrefix
options to restore job
While Kudu does not have a notion of database, usually the full
table name is stored as <database>.<tablename> on kudu side. (NOTE:
database name is optional). Using the options in this patch users
can change the existing database name of the table or add a new
database name to the table i.e. the prefix to the '.' in the full
tablename as well as remove impala prefix for the tables which
are being restored.
1.--newDatabaseName : Use this option to specify the new database
name for the restored table. This will overwrite any existing
database and if there is no existing database, a new database will
be added to the table name. Will not affect "impala::" prefix.
E.g: Adding database name "newDB" to the tables impala::default.test,
impala::test, default.test, test will result in the table names
impala::newDB.test, impala::newDB.test, newDB.test, newDB.test .
This will not affect the existing/source tables.
2.--removeImpalaPrefix : If enabled, this option will remove the
“impala::” prefix, if present from the restored table names. This is
advisable if tables are backed up in Kudu clusters without HMS
integration and being restored to Kudu clusters with HMS integration.
Change-Id: I65adcc1b3de0a8e1ac5b7f50a2d3a7036aa69421
Reviewed-on: http://gerrit.cloudera.org:8080/17388
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <[email protected]>
---
.../scala/org/apache/kudu/backup/KuduRestore.scala | 36 ++++++++++-
.../scala/org/apache/kudu/backup/Options.scala | 21 +++++++
.../org/apache/kudu/backup/TestKuduBackup.scala | 69 +++++++++++++++++++++-
.../scala/org/apache/kudu/backup/TestOptions.scala | 4 ++
4 files changed, 126 insertions(+), 4 deletions(-)
diff --git
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index ef01a8d..92d0717 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -44,6 +44,40 @@ import scala.util.Try
@InterfaceStability.Unstable
object KuduRestore {
val log: Logger = LoggerFactory.getLogger(getClass)
+ val ImpalaPrefix = "impala::"
+
+ /**
+ * Returns the table name in which the data will be restored considering the
flags removeImpalaPrefix,
+ * newDatabaseName and tableSuffix
+ */
+ def getRestoreTableName(fullTableName: String, options: RestoreOptions):
String = {
+ // Break the table down into prefix::databaseName.tableName
+ var prefix = ""
+ var databaseName = ""
+ var tableName = fullTableName
+ val hasImpalaPrefix = tableName.startsWith(ImpalaPrefix)
+ if (hasImpalaPrefix) {
+ prefix = ImpalaPrefix
+ tableName = tableName.substring(ImpalaPrefix.length)
+ }
+ val hasDatabase = tableName.contains(".")
+ if (hasDatabase) {
+ databaseName = tableName.substring(0, tableName.indexOf(".") + 1)
+ tableName = tableName.substring(tableName.indexOf(".") + 1)
+ }
+
+ // If the user does not want the Impala prefix, drop it
+ if (options.removeImpalaPrefix) {
+ prefix = ""
+ }
+
+ // If there is a databaseName specified by the user, use that
+ if (options.newDatabaseName.nonEmpty) {
+ databaseName = options.newDatabaseName.concat(".")
+ }
+
+ s"${prefix}${databaseName}${tableName}${options.tableSuffix}"
+ }
private def doRestore(
tableName: String,
@@ -58,7 +92,7 @@ object KuduRestore {
val graph = backupMap(tableName)
val restorePath = graph.restorePath
val lastMetadata = restorePath.backups.last.metadata
- val restoreName = s"${lastMetadata.getTableName}${options.tableSuffix}"
+ val restoreName = getRestoreTableName(lastMetadata.getTableName, options)
val numJobs = restorePath.backups.size
var currentJob = 1
restorePath.backups.foreach { backup =>
diff --git
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index 2c9762a..5201e60 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -186,6 +186,8 @@ case class RestoreOptions(
tables: Seq[String],
rootPath: String,
kuduMasterAddresses: String =
InetAddress.getLocalHost.getCanonicalHostName,
+ removeImpalaPrefix: Boolean = RestoreOptions.DefaultRemoveImpalaPrefix,
+ newDatabaseName: String = "",
tableSuffix: String = "",
createTables: Boolean = RestoreOptions.DefaultCreateTables,
timestampMs: Long = System.currentTimeMillis(),
@@ -194,6 +196,7 @@ case class RestoreOptions(
restoreOwner: Boolean = RestoreOptions.DefaultRestoreOwner)
object RestoreOptions {
+ val DefaultRemoveImpalaPrefix: Boolean = false
val DefaultCreateTables: Boolean = true
val DefaultFailOnFirstError = false
val DefaultNumParallelRestores = 1
@@ -221,6 +224,24 @@ object RestoreOptions {
"already exist. Default: " + DefaultCreateTables)
.optional()
+ opt[Boolean]("removeImpalaPrefix")
+ .action((v, o) => o.copy(removeImpalaPrefix = v))
+ .text("If true, removes the \"impala::\" prefix, if present from the
restored table names. This is " +
+ "advisable if backup was taken in a Kudu cluster without HMS sync
and restoring to " +
+ "Kudu cluster which has HMS sync in place. Only used when
createTables is true. Default: " +
+ DefaultRemoveImpalaPrefix)
+ .optional()
+
+ opt[String]("newDatabaseName")
+ .action((v, o) => o.copy(newDatabaseName = v))
+ .text(
+ "If set, replaces the existing database name and if there is no
existing database name, a new database " +
+ "name is added. Setting this to an empty string will have the same
effect of not using the flag at all. " +
+ "For example, if this is set to newdb for the tables testtable and
impala::db.testtable the restored " +
+ "tables will have the names newdb.testtable and
impala::newdb.testtable respectively, assuming " +
+ "removeImpalaPrefix is set to false")
+ .optional()
+
opt[String]("tableSuffix")
.action((v, o) => o.copy(tableSuffix = v))
.text("If set, the suffix to add to the restored table names. Only
used when " +
diff --git
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 5ecd468..2177622 100644
---
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -591,6 +591,70 @@ class TestKuduBackup extends KuduTestSuite {
}
@Test
+ def testTableNameChangeFlags() {
+ // Create four tables and load data
+ val rowCount = 100
+ val tableNameWithImpalaPrefix =
"impala::oldDatabase.testTableWithImpalaPrefix"
+ val tableWithImpalaPrefix =
+ kuduClient.createTable(tableNameWithImpalaPrefix, schema, tableOptions)
+ val tableNameWithoutImpalaPrefix =
"oldDatabase.testTableWithoutImpalaPrefix"
+ val tableWithoutImpalaPrefix =
+ kuduClient.createTable(tableNameWithoutImpalaPrefix, schema,
tableOptions)
+ val tableNameWithImpalaPrefixWithoutDb =
"impala::testTableWithImpalaPrefixWithoutDb"
+ val tableWithImpalaPrefixWithoutDb =
+ kuduClient.createTable(tableNameWithImpalaPrefixWithoutDb, schema,
tableOptions)
+ val tableNameWithoutImpalaPrefixWithoutDb =
"testTableWithoutImpalaPrefixWithoutDb"
+ val tableWithoutImpalaPrefixWithoutDb =
+ kuduClient.createTable(tableNameWithoutImpalaPrefixWithoutDb, schema,
tableOptions)
+ insertRows(tableWithImpalaPrefix, rowCount)
+ insertRows(tableWithoutImpalaPrefix, rowCount)
+ insertRows(tableWithImpalaPrefixWithoutDb, rowCount)
+ insertRows(tableWithoutImpalaPrefixWithoutDb, rowCount)
+
+ // Backup the four tables
+ backupAndValidateTable(tableNameWithImpalaPrefix, rowCount, false)
+ backupAndValidateTable(tableNameWithoutImpalaPrefix, rowCount, false)
+ backupAndValidateTable(tableNameWithImpalaPrefixWithoutDb, rowCount, false)
+ backupAndValidateTable(tableNameWithoutImpalaPrefixWithoutDb, rowCount,
false)
+
+ // Restore with removeImpalaPrefix = true and newDatabase = newDatabase
and validate the tables
+ val withImpalaPrefix =
+ createRestoreOptions(Seq(tableNameWithImpalaPrefix))
+ .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+ assertTrue(runRestore(withImpalaPrefix))
+ val rddWithImpalaPrefix =
+ kuduContext
+ .kuduRDD(ss.sparkContext,
s"newDatabase.testTableWithImpalaPrefix-restore")
+ assertEquals(rddWithImpalaPrefix.collect.length, rowCount)
+
+ val withoutImpalaPrefix =
+ createRestoreOptions(Seq(tableNameWithoutImpalaPrefix))
+ .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+ assertTrue(runRestore(withoutImpalaPrefix))
+ val rddWithoutImpalaPrefix =
+ kuduContext.kuduRDD(ss.sparkContext,
s"newDatabase.testTableWithoutImpalaPrefix-restore")
+ assertEquals(rddWithoutImpalaPrefix.collect.length, rowCount)
+
+ val withImpalaPrefixWithoutDb =
+ createRestoreOptions(Seq(tableNameWithImpalaPrefixWithoutDb))
+ .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+ assertTrue(runRestore(withImpalaPrefixWithoutDb))
+ val rddWithImpalaPrefixWithoutDb =
+ kuduContext
+ .kuduRDD(ss.sparkContext,
s"newDatabase.testTableWithImpalaPrefixWithoutDb-restore")
+ assertEquals(rddWithImpalaPrefixWithoutDb.collect.length, rowCount)
+
+ val withoutImpalaPrefixWithoutDb =
+ createRestoreOptions(Seq(tableNameWithoutImpalaPrefixWithoutDb))
+ .copy(removeImpalaPrefix = true, newDatabaseName = "newDatabase")
+ assertTrue(runRestore(withoutImpalaPrefixWithoutDb))
+ val rddWithoutImpalaPrefixWithoutDb =
+ kuduContext
+ .kuduRDD(ss.sparkContext,
s"newDatabase.testTableWithoutImpalaPrefixWithoutDb-restore")
+ assertEquals(rddWithoutImpalaPrefixWithoutDb.collect.length, rowCount)
+ }
+
+ @Test
def testDeleteIgnore(): Unit = {
doDeleteIgnoreTest()
}
@@ -763,9 +827,8 @@ class TestKuduBackup extends KuduTestSuite {
def restoreAndValidateTable(tableName: String, expectedRowCount: Long) = {
val options = createRestoreOptions(Seq(tableName))
assertTrue(runRestore(options))
-
- // Verify the table data.
- val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore")
+ val restoreTableName = KuduRestore.getRestoreTableName(tableName, options)
+ val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$restoreTableName")
assertEquals(rdd.collect.length, expectedRowCount)
}
diff --git
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
index 0ce464c..b65bd38 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
@@ -53,6 +53,10 @@ class TestOptions extends KuduTestSuite {
| --kuduMasterAddresses <value>
| Comma-separated addresses of Kudu masters.
Default: localhost
| --createTables <value> If true, create the tables during restore.
Set to false if the target tables already exist. Default: true
+ | --removeImpalaPrefix <value>
+ | If true, removes the "impala::" prefix, if
present from the restored table names. This is advisable if backup was taken in
a Kudu cluster without HMS sync and restoring to Kudu cluster which has HMS
sync in place. Only used when createTables is true. Default: false
+ | --newDatabaseName <value>
+ | If set, replaces the existing database
name and if there is no existing database name, a new database name is added.
Setting this to an empty string will have the same effect of not using the flag
at all. For example, if this is set to newdb for the tables testtable and
impala::db.testtable the restored tables will have the names newdb.testtable
and impala::newdb.testtable respectively, assuming removeImpalaPrefix is set to
false
| --tableSuffix <value> If set, the suffix to add to the restored
table names. Only used when createTables is true.
| --timestampMs <value> A UNIX timestamp in milliseconds that
defines the latest time to use when selecting restore candidates. Default:
`System.currentTimeMillis()`
| --failOnFirstError Whether to fail the restore job as soon as
a single table restore fails. Default: false