Repository: kudu Updated Branches: refs/heads/master daee55509 -> 5a91629e5
spark: Add support for scanRequestTimeout to DefaultSource The timeout was manually tested via spark-shell and when set to a very low value (1ms) a combination or error messages and timeout warnings were printed due to Spark task retries. Added a simple verification test to ensure that parameter parsing works. Change-Id: I62b01af9d0532448fc3a6cf9328b3912a7865ebb Reviewed-on: http://gerrit.cloudera.org:8080/10725 Tested-by: Kudu Jenkins Reviewed-by: Dan Burkert <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5a91629e Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5a91629e Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5a91629e Branch: refs/heads/master Commit: 5a91629e5f2c524023105c5d39ee23a0f8416a87 Parents: daee555 Author: Mike Percy <[email protected]> Authored: Tue Jun 12 18:46:10 2018 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jul 13 22:00:18 2018 +0000 ---------------------------------------------------------------------- .../apache/kudu/spark/kudu/DefaultSource.scala | 18 ++++++++---- .../apache/kudu/spark/kudu/KuduContext.scala | 3 +- .../org/apache/kudu/spark/kudu/KuduRDD.scala | 6 ++++ .../kudu/spark/kudu/DefaultSourceTest.scala | 30 +++++++++++++++++++- 4 files changed, 50 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala index 18cdd07..dd5b824 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala @@ -51,9 +51,14 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider val SCAN_LOCALITY = "kudu.scanLocality" val IGNORE_NULL = "kudu.ignoreNull" val IGNORE_DUPLICATE_ROW_ERRORS = "kudu.ignoreDuplicateRowErrors" + val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs" def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName + def getScanRequestTimeoutMs(parameters: Map[String, String]): Option[Long] = { + parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong) + } + /** * Construct a BaseRelation using the provided context and parameters. * @@ -62,8 +67,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider * @return a BaseRelation Object */ override def createRelation(sqlContext: SQLContext, - parameters: Map[String, String]): - BaseRelation = { + parameters: Map[String, String]): BaseRelation = { val tableName = parameters.getOrElse(TABLE_KEY, throw new IllegalArgumentException( s"Kudu table name must be specified in create options using key '$TABLE_KEY'")) @@ -78,7 +82,8 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider val writeOptions = new KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull) new KuduRelation(tableName, kuduMaster, faultTolerantScanner, - scanLocality, operationType, None, writeOptions)(sqlContext) + scanLocality, getScanRequestTimeoutMs(parameters), operationType, None, + writeOptions)(sqlContext) } /** @@ -114,7 +119,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "closest_replica")) new KuduRelation(tableName, kuduMaster, faultTolerantScanner, - scanLocality, operationType, Some(schema))(sqlContext) + scanLocality, getScanRequestTimeoutMs(parameters), operationType, Some(schema))(sqlContext) } private def getOperationType(opParam: String): OperationType = { @@ -146,6 +151,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider * otherwise, use non fault tolerant one * @param scanLocality If true scan locality is enabled, so that the scan will * take place at the closest replica. + * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds * @param operationType The default operation type to perform when writing to the relation * @param userSchema A schema used to select columns for the relation * @param writeOptions Kudu write options @@ -156,6 +162,7 @@ class KuduRelation(private val tableName: String, private val masterAddrs: String, private val faultTolerantScanner: Boolean, private val scanLocality: ReplicaSelection, + private[kudu] val scanRequestTimeoutMs: Option[Long], private val operationType: OperationType, private val userSchema: Option[StructType], private val writeOptions: KuduWriteOptions = new KuduWriteOptions)( @@ -192,7 +199,8 @@ class KuduRelation(private val tableName: String, override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { val predicates = filters.flatMap(filterToPredicate) new KuduRDD(context, 1024 * 1024 * 20, requiredColumns, predicates, - table, faultTolerantScanner, scanLocality, sqlContext.sparkContext) + table, faultTolerantScanner, scanLocality, scanRequestTimeoutMs, + sqlContext.sparkContext) } /** http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index af56606..a27e395 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -127,7 +127,8 @@ class KuduContext(val kuduMaster: String, // TODO: provide an elegant way to pass various options (faultTolerantScan, // TODO: localityScan, etc) to KuduRDD new KuduRDD(this, 1024*1024*20, columnProjection.toArray, Array(), - syncClient.openTable(tableName), false, ReplicaSelection.LEADER_ONLY, sc) + syncClient.openTable(tableName), false, ReplicaSelection.LEADER_ONLY, + None, sc) } /** http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala index a92ec3e..7117983 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala @@ -37,6 +37,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext, @transient val table: KuduTable, @transient val isFaultTolerant: Boolean, @transient val scanLocality: ReplicaSelection, + @transient val scanRequestTimeoutMs: Option[Long], @transient val sc: SparkContext) extends RDD[Row](sc, Nil) { override protected def getPartitions: Array[Partition] = { @@ -56,6 +57,11 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext, .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) } + scanRequestTimeoutMs match { + case Some(timeout) => builder.scanRequestTimeout(timeout) + case _ => + } + for (predicate <- predicates) { builder.addPredicate(predicate) } http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index ec4d37c..701e398 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -19,7 +19,7 @@ package org.apache.kudu.spark.kudu import scala.collection.JavaConverters._ import scala.collection.immutable.IndexedSeq import scala.util.control.NonFatal -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.junit.Assert._ @@ -29,6 +29,7 @@ import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers} import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.{Schema, Type} +import org.apache.spark.sql.execution.datasources.LogicalRelation @RunWith(classOf[JUnitRunner]) class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfterEach with Matchers { @@ -706,4 +707,31 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfterEac kuduContext.insertRows(updateDF, tableName, kuduWriteOptions) assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp) } + + /** + * Assuming that the only part of the logical plan is a Kudu scan, this + * function extracts the KuduRelation from the passed DataFrame for + * testing purposes. + */ + def kuduRelationFromDataFrame(dataFrame: DataFrame) = { + val logicalPlan = dataFrame.queryExecution.logical + val logicalRelation = logicalPlan.asInstanceOf[LogicalRelation] + val baseRelation = logicalRelation.relation + baseRelation.asInstanceOf[KuduRelation] + } + + /** + * Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the + * DefaultSource and makes it into the KuduRelation as a configuration + * parameter. + */ + test("scan request timeout propagation") { + kuduOptions = Map( + "kudu.table" -> tableName, + "kudu.master" -> miniCluster.getMasterAddresses, + "kudu.scanRequestTimeoutMs" -> "1") + val dataFrame = sqlContext.read.options(kuduOptions).kudu + val kuduRelation = kuduRelationFromDataFrame(dataFrame) + assert(kuduRelation.scanRequestTimeoutMs == Some(1)) + } }
