Repository: kudu Updated Branches: refs/heads/master 6ce61d6e6 -> aa20ef057
KUDU-2584: Prevent flaky off-by-one errors in backup tests This patch adds 1 ms to the target snapshot time when a backup is taken. This ensures that we donât have flakes due to off-by-one errors where all the values are not read. The underlying reason for adding 1 ms is that we pass the timestamp in ms granularity but the snapshot time consists of microseconds plus a logical clock. This means if the data is inserted with a fraction of a ms remaining it could be truncated and unread. Additionaly this patch copies over the timestamp propagation call from the KuduRDD and ensures the Spark tests use the Kudu client from the KuduContext. This should further prevent future snapshot issues. This patch also includes an auto-formating change in KuduBackupOptions that must have been missed in a previous commit. Change-Id: Ia0f1b4a4138cc8c913543a68fad748927cdc439d Reviewed-on: http://gerrit.cloudera.org:8080/11815 Tested-by: Grant Henke <[email protected]> Reviewed-by: Adar Dembo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/aa20ef05 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/aa20ef05 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/aa20ef05 Branch: refs/heads/master Commit: aa20ef0576cd9e2cf4a035ecdf6dbd746d94c586 Parents: 6ce61d6 Author: Grant Henke <[email protected]> Authored: Mon Oct 29 12:50:11 2018 -0500 Committer: Grant Henke <[email protected]> Committed: Tue Nov 6 23:07:14 2018 +0000 ---------------------------------------------------------------------- .../apache/kudu/backup/KuduBackupOptions.scala | 5 +++-- .../org/apache/kudu/backup/KuduBackupRDD.scala | 10 ++++++++-- .../org/apache/kudu/backup/TestKuduBackup.scala | 18 ++++++++++++++++-- .../apache/kudu/spark/kudu/KuduTestSuite.scala | 5 +++-- 4 files changed, 30 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala ---------------------------------------------------------------------- diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala index c594b1a..82a8eb4 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala @@ -87,8 +87,9 @@ object KuduBackupOptions { opt[Long]("keepAlivePeriodMs") .action((v, o) => o.copy(keepAlivePeriodMs = v)) - .text("Sets the period at which to send keep-alive requests to the tablet server to ensure" + - " that scanners do not time out") + .text( + "Sets the period at which to send keep-alive requests to the tablet server to ensure" + + " that scanners do not time out") .optional() arg[String]("<table>...") http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala ---------------------------------------------------------------------- diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala index f42b369..9be2bf2 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala @@ -87,7 +87,7 @@ class KuduBackupRDD private[kudu] ( // TODO: Get deletes and updates for incremental backups. val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client) - new RowIterator(scanner, keepAlivePeriodMs) + new RowIterator(scanner, kuduContext, keepAlivePeriodMs) } override def getPreferredLocations(partition: Partition): Seq[String] = { @@ -106,7 +106,10 @@ private case class KuduBackupPartition(index: Int, scanToken: Array[Byte], locat * that takes the job partitions and task context and expects to return an Iterator[Row]. * This implementation facilitates that. */ -private class RowIterator(private val scanner: KuduScanner, val keepAlivePeriodMs: Long) +private class RowIterator( + private val scanner: KuduScanner, + val kuduContext: KuduContext, + val keepAlivePeriodMs: Long) extends Iterator[Row] { private var currentIterator: RowResultIterator = RowResultIterator.empty @@ -130,6 +133,9 @@ private class RowIterator(private val scanner: KuduScanner, val keepAlivePeriodM } currentIterator = scanner.nextRows() } + // Update timestampAccumulator with the client's last propagated + // timestamp on each executor. + kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp) KeepKuduScannerAlive() currentIterator.hasNext } http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala ---------------------------------------------------------------------- diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index e74fdcd..d3ff3c4 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -36,6 +36,7 @@ import org.apache.kudu.Type import org.apache.kudu.spark.kudu._ import org.apache.kudu.test.RandomUtils import org.apache.kudu.util.DecimalUtil +import org.apache.kudu.util.HybridTimeUtil import org.junit.Assert._ import org.junit.Test import org.slf4j.Logger @@ -349,9 +350,22 @@ class TestKuduBackup extends KuduTestSuite { def backupAndRestore(tableNames: String*): Unit = { val dir = Files.createTempDirectory("backup") val path = dir.toUri.toString - + val nowMs = System.currentTimeMillis() + + // Log the timestamps to simplify flaky debugging. + log.info(s"nowMs: ${System.currentTimeMillis()}") + val hts = HybridTimeUtil.HTTimestampToPhysicalAndLogical(kuduClient.getLastPropagatedTimestamp) + log.info(s"propagated physicalMicros: ${hts(0)}") + log.info(s"propagated logical: ${hts(1)}") + + // Add one millisecond to our target snapshot time. This will ensure we read all of the records + // in the backup and prevent flaky off-by-one errors. The underlying reason for adding 1 ms is + // that we pass the timestamp in millisecond granularity but the snapshot time has microsecond + // granularity. This means if the test runs fast enough that data is inserted with the same + // millisecond value as nowMs (after truncating the micros) the records inserted in the + // microseconds after truncation could be unread. val backupOptions = - new KuduBackupOptions(tableNames, path, harness.getMasterAddressesAsString) + new KuduBackupOptions(tableNames, path, harness.getMasterAddressesAsString, nowMs + 1) KuduBackup.run(backupOptions, ss) val restoreOptions = http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala index 03b2ba2..da43ef6 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala @@ -125,11 +125,12 @@ trait KuduTestSuite extends JUnitSuite { @Before def setUpBase(): Unit = { - kuduClient = harness.getClient - ss = SparkSession.builder().config(conf).getOrCreate() kuduContext = new KuduContext(harness.getMasterAddressesAsString, ss.sparkContext) + // Spark tests should use the client from the kuduContext. + kuduClient = kuduContext.syncClient + table = kuduClient.createTable(tableName, schema, tableOptions) val simpleTableOptions = new CreateTableOptions()
