This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 70e575fb706290a059c0b8ad5b0c3b4be436f3b2 Author: Grant Henke <[email protected]> AuthorDate: Mon Apr 29 09:14:39 2019 -0500 [backup] Support column alterations between backups This patch adds support for column alterations between backups. When restoring each backup, it leverages the column ID map in the metadata to compare against the final metadata. Using this it can detect and handle added, dropped, and renamed columns. Change-Id: I15e159caf485d83f4d5a36305d677828840bff03 Reviewed-on: http://gerrit.cloudera.org:8080/13173 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Mike Percy <[email protected]> --- .../scala/org/apache/kudu/backup/KuduRestore.scala | 44 +++++++++++- .../org/apache/kudu/backup/TestKuduBackup.scala | 79 ++++++++++++++++++++++ 2 files changed, 121 insertions(+), 2 deletions(-) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala index eb2a03e..bf6103c 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala @@ -21,12 +21,15 @@ import org.apache.kudu.client.AlterTableOptions import org.apache.kudu.client.SessionConfiguration.FlushMode import org.apache.kudu.spark.kudu.KuduContext import org.apache.kudu.spark.kudu.RowConverter +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability import org.slf4j.Logger import org.slf4j.LoggerFactory +import scala.collection.JavaConverters._ + /** * The main class for a Kudu restore spark job. */ @@ -68,9 +71,7 @@ object KuduRestore { val backupSchema = io.dataSchema(TableMetadata.getKuduSchema(backup.metadata)) val rowActionCol = backupSchema.fields.last.name - val table = context.syncClient.openTable(restoreName) - val restoreSchema = io.dataSchema(table.getSchema) var data = session.sqlContext.read .format(backup.metadata.getDataFormat) @@ -81,6 +82,10 @@ object KuduRestore { .na .fill(RowAction.UPSERT.getValue, Seq(rowActionCol)) + // Adjust for dropped and renamed columns. + data = adjustSchema(data, backup.metadata, lastMetadata, rowActionCol) + val restoreSchema = data.schema + // Write the data to Kudu. data.queryExecution.toRdd.foreachPartition { internalRows => val table = context.syncClient.openTable(restoreName) @@ -151,6 +156,41 @@ object KuduRestore { }) } + /** + * Returns a modified DataFrame with columns adjusted to match the lastMetadata. + */ + private def adjustSchema( + df: DataFrame, + currentMetadata: TableMetadataPB, + lastMetadata: TableMetadataPB, + rowActionCol: String): DataFrame = { + log.info("Adjusting columns to handle alterations") + val idToName = lastMetadata.getColumnIdsMap.asScala.map(_.swap) + // Ignore the rowActionCol, which isn't a real column. + val currentColumns = currentMetadata.getColumnIdsMap.asScala.filter(_._1 != rowActionCol) + var result = df + // First drop all the columns that no longer exist. + // This is required to be sure a rename doesn't collide with an old column. + currentColumns.foreach { + case (colName, id) => + if (!idToName.contains(id)) { + // If the last metadata doesn't contain the id, the column is dropped. + log.info(s"Dropping the column $colName from backup data") + result = result.drop(colName) + } + } + // Then rename all the columns that were renamed in the last metadata. + currentColumns.foreach { + case (colName, id) => + if (idToName.contains(id) && idToName(id) != colName) { + // If the final name doesn't match the current name, the column is renamed. + log.info(s"Renamed the column $colName to ${idToName(id)} in backup data") + result = result.withColumnRenamed(colName, idToName(id)) + } + } + result + } + def main(args: Array[String]): Unit = { val options = RestoreOptions .parse(args) diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index c2b5356..dcc813a 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -26,6 +26,8 @@ import org.apache.kudu.client.PartitionSchema.HashBucketSchema import org.apache.kudu.client._ import org.apache.kudu.ColumnSchema import org.apache.kudu.Schema +import org.apache.kudu.Type +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder import org.apache.kudu.spark.kudu._ import org.apache.kudu.test.CapturingLogAppender import org.apache.kudu.test.RandomUtils @@ -270,6 +272,83 @@ class TestKuduBackup extends KuduTestSuite { restoreAndValidateTable(tableName, rowCount) } + @Test + def testColumnAlterHandling(): Unit = { + // Create a basic table. + val tableName = "testColumnAlterHandling" + val columns = List( + new ColumnSchemaBuilder("key", Type.INT32).key(true).build(), + new ColumnSchemaBuilder("col_a", Type.STRING).build(), + new ColumnSchemaBuilder("col_b", Type.STRING).build(), + new ColumnSchemaBuilder("col_c", Type.STRING).build(), + new ColumnSchemaBuilder("col_d", Type.STRING).build() + ) + val schema = new Schema(columns.asJava) + val options = new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + var table = kuduClient.createTable(tableName, schema, options) + val session = kuduClient.newSession() + + // Insert some rows and take a full backup. + Range(0, 10).foreach { i => + val insert = table.newInsert + val row = insert.getRow + row.addInt("key", i) + row.addString("col_a", s"a$i") + row.addString("col_b", s"b$i") + row.addString("col_c", s"c$i") + row.addString("col_d", s"d$i") + session.apply(insert) + } + backupAndValidateTable(tableName, 10, false) + + // Rename col_a to col_1 and add a new col_a to ensure the column id's and defaults + // work correctly. Also drop col_d and rename col_c to ensure collisions on renaming + // columns don't occur when processing columns from left to right. + kuduClient.alterTable( + tableName, + new AlterTableOptions() + .renameColumn("col_a", "col_1") + .addColumn(new ColumnSchemaBuilder("col_a", Type.STRING) + .defaultValue("default") + .build()) + .dropColumn("col_b") + .dropColumn("col_d") + .renameColumn("col_c", "col_d") + ) + + // Insert more rows and take an incremental backup + table = kuduClient.openTable(tableName) + Range(10, 20).foreach { i => + val insert = table.newInsert + val row = insert.getRow + row.addInt("key", i) + row.addString("col_1", s"a$i") + row.addString("col_d", s"c$i") + session.apply(insert) + } + backupAndValidateTable(tableName, 10, true) + + // Restore the table and validate. + doRestore(createRestoreOptions(Seq(tableName))) + + val restoreTable = kuduClient.openTable(s"$tableName-restore") + val scanner = kuduClient.newScannerBuilder(restoreTable).build() + val rows = scanner.asScala.toSeq + + // Validate there are still 20 rows. + assertEquals(20, rows.length) + // Validate col_b is dropped from all rows. + assertTrue(rows.forall(!_.getSchema.hasColumn("col_b"))) + // Validate the existing and renamed columns have the expected set of values. + val expectedSet = Range(0, 20).toSet + assertEquals(expectedSet, rows.map(_.getInt("key")).toSet) + assertEquals(expectedSet.map(i => s"a$i"), rows.map(_.getString("col_1")).toSet) + assertEquals(expectedSet.map(i => s"c$i"), rows.map(_.getString("col_d")).toSet) + // Validate the new col_a has all defaults. + assertTrue(rows.forall(_.getString("col_a") == "default")) + } + def createRandomTable(): KuduTable = { val columnCount = random.nextInt(50) + 1 // At least one column. val keyColumnCount = random.nextInt(columnCount) + 1 // At least one key.
