This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 5faf79726605e3a5d8b33bd18ce94c8c09241d3f Author: Grant Henke <[email protected]> AuthorDate: Fri Oct 30 11:35:04 2020 -0500 KUDU-1563 Use DELETE_IGNORE in KuduRestore job This patch changes the KuduRestore job to use DELETE_IGNORE operations instead of DELETE when the cluster supports ignore operations. `session.setIgnoreAllNotFoundRows(true)` is retained to support falling back to DELETE operations for backward compatibility. Change-Id: Ib6f6d5a31be77630e79ff1566e796eb5183a5d22 Reviewed-on: http://gerrit.cloudera.org:8080/16683 Reviewed-by: Attila Bukor <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Grant Henke <[email protected]> --- .../main/scala/org/apache/kudu/backup/KuduRestore.scala | 13 ++++++++++--- .../scala/org/apache/kudu/backup/TestKuduBackup.scala | 17 ++++++++++++++++- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala index 6549cd6..ef01a8d 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala @@ -91,6 +91,7 @@ object KuduRestore { .load(backup.path.toString) // Default the the row action column with a value of "UPSERT" so that the // rows from a full backup, which don't have a row action, are upserted. + // TODO(ghenke): Consider using INSERT_IGNORE for full backups. .na .fill(RowAction.UPSERT.getValue, Seq(rowActionCol)) @@ -106,8 +107,8 @@ object KuduRestore { val session = context.syncClient.newSession session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND) // In the case of task retries we need to ignore NotFound errors for deleted rows. - // TODO(KUDU-1563): Implement server side ignore capabilities to improve performance - // and reliability. + // This can't occur if DELETE_IGNORE is used, but still needs to be set in the case + // DELETE is used for backwards compatibility. session.setIgnoreAllNotFoundRows(true) try for (internalRow <- internalRows) { // Convert the InternalRows to Rows. @@ -120,7 +121,13 @@ object KuduRestore { // Generate an operation based on the row action. val operation = rowAction match { case RowAction.UPSERT => table.newUpsert() - case RowAction.DELETE => table.newDelete() + case RowAction.DELETE => { + if (context.supportsIgnoreOperations) { + table.newDeleteIgnore() + } else { + table.newDelete() + } + } case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction") } // Convert the Spark row to a partial row and set it on the operation. diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index da8fe58..817875e 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -32,8 +32,9 @@ import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter import org.apache.kudu.spark.kudu._ import org.apache.kudu.test.CapturingLogAppender -import org.apache.kudu.test.KuduTestHarness.TabletServerConfig +import org.apache.kudu.test.KuduTestHarness import org.apache.kudu.test.RandomUtils +import org.apache.kudu.test.KuduTestHarness.TabletServerConfig import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder import org.apache.kudu.util.HybridTimeUtil import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder @@ -591,6 +592,20 @@ class TestKuduBackup extends KuduTestSuite { @Test def testDeleteIgnore(): Unit = { + doDeleteIgnoreTest() + } + + /** + * Identical to the above test, but exercising the old session based delete ignore operations, + * ensuring we functionally support the same semantics. + */ + @Test + @KuduTestHarness.MasterServerConfig(flags = Array("--master_support_ignore_operations=false")) + def testLegacyDeleteIgnore(): Unit = { + doDeleteIgnoreTest() + } + + def doDeleteIgnoreTest(): Unit = { insertRows(table, 100) // Insert data into the default test table. // Run and validate initial backup.
