KUDU-2563: [spark] Use the scanner keepAlive API Adds scheduled keepAlive calls to the scanner in the KuduRDD RowIterator. The period in which the calls are made is configurable via keepAlivePeriodMs and has a default of 15 seconds (which is 1/4 the default scanner ttl).
This implementation is similar to the Impala integration. It checks if a call to the keepAlive API is needed as it processes each row. Compared to a background thread, this has the downside of being less consistently scheduled and susceptible to scenarios in which a single row takes longer to process than the ttl. However, because the scanner is not thread safe, this is the most straightforward solution and has been proven to work. Change-Id: Ia7f26d6ab8deb24982055d247938a11e188c35db Reviewed-on: http://gerrit.cloudera.org:8080/11571 Reviewed-by: Grant Henke <[email protected]> Tested-by: Grant Henke <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf1b1f42 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf1b1f42 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf1b1f42 Branch: refs/heads/master Commit: cf1b1f42cbcc3ee67477ddc44cd0ff5070f1caac Parents: fb79f8f Author: Grant Henke <[email protected]> Authored: Sun Sep 30 22:31:52 2018 -0500 Committer: Grant Henke <[email protected]> Committed: Fri Oct 5 02:24:04 2018 +0000 ---------------------------------------------------------------------- .../apache/kudu/client/AsyncKuduScanner.java | 7 ++ .../org/apache/kudu/client/KuduScanner.java | 9 +- .../apache/kudu/spark/kudu/DefaultSource.scala | 18 ++-- .../org/apache/kudu/spark/kudu/KuduRDD.scala | 25 +++++- .../kudu/spark/kudu/KuduReadOptions.scala | 4 + .../apache/kudu/spark/kudu/KuduRDDTest.scala | 87 ++++++++++++++++++++ 6 files changed, 139 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 804978e..71b1146 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -605,6 +605,13 @@ public final class AsyncKuduScanner { } /** + * @return true if the scanner has been closed. + */ + public boolean isClosed() { + return closed; + } + + /** * Closes this scanner (don't forget to call this when you're done with it!). * <p> * Closing a scanner already closed has no effect. The deferred returned http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java index 209fada..f945d8f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java @@ -79,6 +79,13 @@ public class KuduScanner { } /** + * @return true if the scanner has been closed. + */ + public boolean isClosed() { + return asyncScanner.isClosed(); + } + + /** * Closes this scanner (don't forget to call this when you're done with it!). * <p> * Closing a scanner already closed has no effect. @@ -135,7 +142,7 @@ public class KuduScanner { * Returns the RemoteTablet currently being scanned, if any. */ @InterfaceAudience.LimitedPrivate("Test") - RemoteTablet currentTablet() { + public RemoteTablet currentTablet() { return asyncScanner.currentTablet(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala index 29635a3..890ecda 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala @@ -62,6 +62,7 @@ class DefaultSource val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs" val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs" val BATCH_SIZE = "kudu.batchSize" + val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs" /** * Construct a BaseRelation using the provided context and parameters. @@ -77,13 +78,13 @@ class DefaultSource } /** - * Construct a BaseRelation using the provided context, parameters and schema. - * - * @param sqlContext SparkSQL context - * @param parameters parameters given to us from SparkSQL - * @param schema the schema used to select columns for the relation - * @return a BaseRelation Object - */ + * Construct a BaseRelation using the provided context, parameters and schema. + * + * @param sqlContext SparkSQL context + * @param parameters parameters given to us from SparkSQL + * @param schema the schema used to select columns for the relation + * @return a BaseRelation Object + */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], @@ -141,11 +142,14 @@ class DefaultSource parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality) val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong) val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong) + val keepAlivePeriodMs = + parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs) KuduReadOptions( batchSize, scanLocality, faultTolerantScanner, + keepAlivePeriodMs, scanRequestTimeoutMs, socketReadTimeoutMs) } http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala index 77dabcc..2deea16 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala @@ -24,7 +24,6 @@ import org.apache.spark.SparkContext import org.apache.spark.TaskContext import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability - import org.apache.kudu.client._ import org.apache.kudu.Type import org.apache.kudu.client @@ -45,6 +44,9 @@ class KuduRDD private[kudu] ( @transient val sc: SparkContext) extends RDD[Row](sc, Nil) { + // Defined here because the options are transient. + private val keepAlivePeriodMs = options.keepAlivePeriodMs + override protected def getPartitions: Array[Partition] = { val builder = kuduContext.syncClient .newScanTokenBuilder(table) @@ -91,7 +93,7 @@ class KuduRDD private[kudu] ( val partition: KuduPartition = part.asInstanceOf[KuduPartition] val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client) - new RowIterator(scanner, kuduContext) + new RowIterator(scanner, kuduContext, keepAlivePeriodMs) } override def getPreferredLocations(partition: Partition): Seq[String] = { @@ -112,11 +114,27 @@ private class KuduPartition( * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]]. * @param scanner the wrapped scanner * @param kuduContext the kudu context + * @param keepAlivePeriodMs the period in which to call the keepAlive on the scanners */ -private class RowIterator(private val scanner: KuduScanner, private val kuduContext: KuduContext) +private class RowIterator( + val scanner: KuduScanner, + val kuduContext: KuduContext, + val keepAlivePeriodMs: Long) extends Iterator[Row] { private var currentIterator: RowResultIterator = RowResultIterator.empty + private var lastKeepAliveTimeMs = System.currentTimeMillis() + + /** + * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs has passed. + */ + private def KeepKuduScannerAlive(): Unit = { + val now = System.currentTimeMillis + if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) { + scanner.keepAlive() + lastKeepAliveTimeMs = now + } + } override def hasNext: Boolean = { while (!currentIterator.hasNext && scanner.hasMoreRows) { @@ -128,6 +146,7 @@ private class RowIterator(private val scanner: KuduScanner, private val kuduCont // timestamp on each executor. kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp) } + KeepKuduScannerAlive() currentIterator.hasNext } http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala index 7c9b888..a1983b5 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala @@ -31,6 +31,8 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._ * take place at the closest replica * @param faultTolerantScanner scanner type to be used. Fault tolerant if true, * otherwise, use non fault tolerant one + * @param keepAlivePeriodMs The period at which to send keep-alive requests to the tablet + * server to ensure that scanners do not time out * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds * @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket */ @@ -40,6 +42,7 @@ case class KuduReadOptions( batchSize: Int = defaultBatchSize, scanLocality: ReplicaSelection = defaultScanLocality, faultTolerantScanner: Boolean = defaultFaultTolerantScanner, + keepAlivePeriodMs: Long = defaultKeepAlivePeriodMs, scanRequestTimeoutMs: Option[Long] = None, socketReadTimeoutMs: Option[Long] = None) @@ -47,4 +50,5 @@ object KuduReadOptions { val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting? val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA val defaultFaultTolerantScanner: Boolean = false + val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl. } http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala index f0fb4a0..49bc15e 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala @@ -17,6 +17,15 @@ package org.apache.kudu.spark.kudu +import scala.collection.JavaConverters._ +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder +import org.apache.kudu.client.CreateTableOptions +import org.apache.kudu.Schema +import org.apache.kudu.Type +import org.apache.kudu.test.KuduTestHarness.TabletServerConfig +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row import org.junit.Test class KuduRDDTest extends KuduTestSuite { @@ -27,4 +36,82 @@ class KuduRDDTest extends KuduTestSuite { val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key")) assert(rdd.collect.length == 100) } + + @Test + @TabletServerConfig( + // Hard coded values because Scala doesn't handle array constants in annotations. + flags = Array( + "--scanner_ttl_ms=5000", + "--scanner_gc_check_interval_us=500000" // 10% of the TTL. + )) + def testKeepAlive() { + val rowCount = 500 + val shortScannerTtlMs = 5000 + + // Create a simple table with a single partition. + val tableName = "testKeepAlive" + val tableSchema = { + val columns = List( + new ColumnSchemaBuilder("key", Type.INT32).key(true).build(), + new ColumnSchemaBuilder("val", Type.INT32).build()).asJava + new Schema(columns) + } + val tableOptions = new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1) + val table = kuduClient.createTable(tableName, tableSchema, tableOptions) + + val session = kuduClient.newSession() + Range(0, rowCount).map { i => + val insert = table.newInsert + val row = insert.getRow + row.addInt(0, i) + row.addInt(1, i) + session.apply(insert) + } + session.flush() + + def processRDD(rdd: RDD[Row]): Unit = { + // Ensure reading takes longer than the scanner ttl. + var i = 0 + rdd.foreach { row => + // Sleep for half the ttl for the first few rows. This ensures + // we are on the same tablet and will go past the ttl without + // a new scan request. It also ensures a single row doesn't go + // longer than the ttl. + if (i < 5) { + Thread.sleep(shortScannerTtlMs / 2) // Sleep for half the ttl. + i = i + 1 + } + } + } + + // Test that a keepAlivePeriodMs less than the scanner ttl is successful. + val goodRdd = kuduContext.kuduRDD( + ss.sparkContext, + tableName, + List("key"), + KuduReadOptions( + batchSize = 100, // Set a small batch size so the first scan doesn't read all the rows. + keepAlivePeriodMs = shortScannerTtlMs / 4) + ) + processRDD(goodRdd) + + // Test that a keepAlivePeriodMs greater than the scanner ttl fails. + val badRdd = kuduContext.kuduRDD( + ss.sparkContext, + tableName, + List("key"), + KuduReadOptions( + batchSize = 100, // Set a small batch size so the first scan doesn't read all the rows. + keepAlivePeriodMs = shortScannerTtlMs * 2) + ) + try { + processRDD(badRdd) + fail("Should throw a scanner not found exception") + } catch { + case ex: SparkException => + assert(ex.getMessage.matches("(?s).*Scanner .* not found.*")) + } + } }
