Repository: kudu
Updated Branches:
  refs/heads/master daee55509 -> 5a91629e5


spark: Add support for scanRequestTimeout to DefaultSource

The timeout was manually tested via spark-shell and when set to a very
low value (1ms) a combination or error messages and timeout warnings
were printed due to Spark task retries.

Added a simple verification test to ensure that parameter parsing works.

Change-Id: I62b01af9d0532448fc3a6cf9328b3912a7865ebb
Reviewed-on: http://gerrit.cloudera.org:8080/10725
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5a91629e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5a91629e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5a91629e

Branch: refs/heads/master
Commit: 5a91629e5f2c524023105c5d39ee23a0f8416a87
Parents: daee555
Author: Mike Percy <[email protected]>
Authored: Tue Jun 12 18:46:10 2018 -0700
Committer: Mike Percy <[email protected]>
Committed: Fri Jul 13 22:00:18 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 18 ++++++++----
 .../apache/kudu/spark/kudu/KuduContext.scala    |  3 +-
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    |  6 ++++
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 30 +++++++++++++++++++-
 4 files changed, 50 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 18cdd07..dd5b824 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -51,9 +51,14 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
   val SCAN_LOCALITY = "kudu.scanLocality"
   val IGNORE_NULL = "kudu.ignoreNull"
   val IGNORE_DUPLICATE_ROW_ERRORS = "kudu.ignoreDuplicateRowErrors"
+  val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
 
   def defaultMasterAddrs: String = 
InetAddress.getLocalHost.getCanonicalHostName
 
+  def getScanRequestTimeoutMs(parameters: Map[String, String]): Option[Long] = 
{
+    parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
+  }
+
   /**
     * Construct a BaseRelation using the provided context and parameters.
     *
@@ -62,8 +67,7 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
     * @return           a BaseRelation Object
     */
   override def createRelation(sqlContext: SQLContext,
-                              parameters: Map[String, String]):
-  BaseRelation = {
+                              parameters: Map[String, String]): BaseRelation = 
{
     val tableName = parameters.getOrElse(TABLE_KEY,
       throw new IllegalArgumentException(
         s"Kudu table name must be specified in create options using key 
'$TABLE_KEY'"))
@@ -78,7 +82,8 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
     val writeOptions = new KuduWriteOptions(ignoreDuplicateRowErrors, 
ignoreNull)
 
     new KuduRelation(tableName, kuduMaster, faultTolerantScanner,
-      scanLocality, operationType, None, writeOptions)(sqlContext)
+      scanLocality, getScanRequestTimeoutMs(parameters), operationType, None,
+      writeOptions)(sqlContext)
   }
 
   /**
@@ -114,7 +119,7 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
     val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, 
"closest_replica"))
 
     new KuduRelation(tableName, kuduMaster, faultTolerantScanner,
-      scanLocality, operationType, Some(schema))(sqlContext)
+      scanLocality, getScanRequestTimeoutMs(parameters), operationType, 
Some(schema))(sqlContext)
   }
 
   private def getOperationType(opParam: String): OperationType = {
@@ -146,6 +151,7 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
   *                             otherwise, use non fault tolerant one
   * @param scanLocality If true scan locality is enabled, so that the scan will
   *                     take place at the closest replica.
+  * @param scanRequestTimeoutMs Maximum time allowed per scan request, in 
milliseconds
   * @param operationType The default operation type to perform when writing to 
the relation
   * @param userSchema A schema used to select columns for the relation
   * @param writeOptions Kudu write options
@@ -156,6 +162,7 @@ class KuduRelation(private val tableName: String,
                    private val masterAddrs: String,
                    private val faultTolerantScanner: Boolean,
                    private val scanLocality: ReplicaSelection,
+                   private[kudu] val scanRequestTimeoutMs: Option[Long],
                    private val operationType: OperationType,
                    private val userSchema: Option[StructType],
                    private val writeOptions: KuduWriteOptions = new 
KuduWriteOptions)(
@@ -192,7 +199,8 @@ class KuduRelation(private val tableName: String,
   override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
     val predicates = filters.flatMap(filterToPredicate)
     new KuduRDD(context, 1024 * 1024 * 20, requiredColumns, predicates,
-                table, faultTolerantScanner, scanLocality, 
sqlContext.sparkContext)
+                table, faultTolerantScanner, scanLocality, 
scanRequestTimeoutMs,
+                sqlContext.sparkContext)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index af56606..a27e395 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -127,7 +127,8 @@ class KuduContext(val kuduMaster: String,
     // TODO: provide an elegant way to pass various options (faultTolerantScan,
     // TODO: localityScan, etc) to KuduRDD
     new KuduRDD(this, 1024*1024*20, columnProjection.toArray, Array(),
-                syncClient.openTable(tableName), false, 
ReplicaSelection.LEADER_ONLY, sc)
+                syncClient.openTable(tableName), false, 
ReplicaSelection.LEADER_ONLY,
+                None, sc)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index a92ec3e..7117983 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -37,6 +37,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
                              @transient val table: KuduTable,
                              @transient val isFaultTolerant: Boolean,
                              @transient val scanLocality: ReplicaSelection,
+                             @transient val scanRequestTimeoutMs: Option[Long],
                              @transient val sc: SparkContext) extends 
RDD[Row](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
@@ -56,6 +57,11 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
              .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
     }
 
+    scanRequestTimeoutMs match {
+      case Some(timeout) => builder.scanRequestTimeout(timeout)
+      case _ =>
+    }
+
     for (predicate <- predicates) {
       builder.addPredicate(predicate)
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5a91629e/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index ec4d37c..701e398 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -19,7 +19,7 @@ package org.apache.kudu.spark.kudu
 import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
 import scala.util.control.NonFatal
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
 import org.junit.Assert._
@@ -29,6 +29,7 @@ import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
 import org.apache.kudu.client.CreateTableOptions
 import org.apache.kudu.{Schema, Type}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 @RunWith(classOf[JUnitRunner])
 class DefaultSourceTest extends FunSuite with TestContext with 
BeforeAndAfterEach with Matchers {
@@ -706,4 +707,31 @@ class DefaultSourceTest extends FunSuite with TestContext 
with BeforeAndAfterEac
     kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
     assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
   }
+
+  /**
+    * Assuming that the only part of the logical plan is a Kudu scan, this
+    * function extracts the KuduRelation from the passed DataFrame for
+    * testing purposes.
+    */
+  def kuduRelationFromDataFrame(dataFrame: DataFrame) = {
+    val logicalPlan = dataFrame.queryExecution.logical
+    val logicalRelation = logicalPlan.asInstanceOf[LogicalRelation]
+    val baseRelation = logicalRelation.relation
+    baseRelation.asInstanceOf[KuduRelation]
+  }
+
+  /**
+    * Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the
+    * DefaultSource and makes it into the KuduRelation as a configuration
+    * parameter.
+    */
+  test("scan request timeout propagation") {
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> miniCluster.getMasterAddresses,
+      "kudu.scanRequestTimeoutMs" -> "1")
+    val dataFrame = sqlContext.read.options(kuduOptions).kudu
+    val kuduRelation = kuduRelationFromDataFrame(dataFrame)
+    assert(kuduRelation.scanRequestTimeoutMs == Some(1))
+  }
 }

Reply via email to