Repository: spark
Updated Branches:
  refs/heads/branch-2.2 75e5ea294 -> 7076ab40f


[SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries

## What changes were proposed in this pull request?

The pull requests proposes to remove the hardcoded values for Amazon Kinesis - 
MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES.

This change is critical for kinesis checkpoint recovery when the kinesis backed 
rdd is huge.
Following happens in a typical kinesis recovery :
- kinesis throttles large number of requests while recovering
- retries in case of throttling are not able to recover due to the small wait 
period
- kinesis throttles per second, the wait period should be configurable for 
recovery

The patch picks the spark kinesis configs from:
- spark.streaming.kinesis.retry.wait.time
- spark.streaming.kinesis.retry.max.attempts

Jira : https://issues.apache.org/jira/browse/SPARK-20140

## How was this patch tested?

Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the 
modified configurations. Wasn't able to test the patch with actual throttling.

Author: Yash Sharma <ysha...@atlassian.com>

Closes #17467 from yssharma/ysharma/spark-kinesis-retries.

(cherry picked from commit 38f4e8692ce3b6cbcfe0c1aff9b5e662f7a308b7)
Signed-off-by: Burak Yavuz <brk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7076ab40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7076ab40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7076ab40

Branch: refs/heads/branch-2.2
Commit: 7076ab40f86fe606cd9b813dad506e921501383e
Parents: 75e5ea2
Author: Yash Sharma <ysha...@atlassian.com>
Authored: Tue May 16 15:08:05 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Tue May 16 15:08:46 2017 -0700

----------------------------------------------------------------------
 .../kinesis/KinesisBackedBlockRDD.scala         | 33 ++++-----
 .../streaming/kinesis/KinesisInputDStream.scala |  6 +-
 .../kinesis/KinesisReadConfigurations.scala     | 78 ++++++++++++++++++++
 .../streaming/kinesis/KinesisStreamSuite.scala  | 49 +++++++++++-
 4 files changed, 143 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7076ab40/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index f31ebf1..88b2942 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.AWSCredentials
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
@@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag](
     @transient private val _blockIds: Array[BlockId],
     @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
     @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
-    val retryTimeoutMs: Int = 10000,
     val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
-    val kinesisCreds: SparkAWSCredentials = DefaultCredentials
+    val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
+    val kinesisReadConfigs: KinesisReadConfigurations = 
KinesisReadConfigurations()
   ) extends BlockRDD[T](sc, _blockIds) {
 
   require(_blockIds.length == arrayOfseqNumberRanges.length,
@@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
       val credentials = kinesisCreds.provider.getCredentials
       partition.seqNumberRanges.ranges.iterator.flatMap { range =>
         new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
-          range, retryTimeoutMs).map(messageHandler)
+          range, kinesisReadConfigs).map(messageHandler)
       }
     }
     if (partition.isBlockIdValid) {
@@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator(
     endpointUrl: String,
     regionId: String,
     range: SequenceNumberRange,
-    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+    kinesisReadConfigs: KinesisReadConfigurations) extends 
NextIterator[Record] with Logging {
 
   private val client = new AmazonKinesisClient(credentials)
   private val streamName = range.streamName
@@ -251,21 +251,19 @@ class KinesisSequenceRangeIterator(
 
   /** Helper method to retry Kinesis API request with exponential backoff and 
timeouts */
   private def retryOrTimeout[T](message: String)(body: => T): T = {
-    import KinesisSequenceRangeIterator._
-
-    var startTimeMs = System.currentTimeMillis()
+    val startTimeMs = System.currentTimeMillis()
     var retryCount = 0
-    var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
     var result: Option[T] = None
     var lastError: Throwable = null
+    var waitTimeInterval = kinesisReadConfigs.retryWaitTimeMs
 
-    def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= 
retryTimeoutMs
-    def isMaxRetryDone = retryCount >= MAX_RETRIES
+    def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= 
kinesisReadConfigs.retryTimeoutMs
+    def isMaxRetryDone = retryCount >= kinesisReadConfigs.maxRetries
 
     while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
       if (retryCount > 0) {  // wait only if this is a retry
-        Thread.sleep(waitTimeMs)
-        waitTimeMs *= 2  // if you have waited, then double wait time for next 
round
+        Thread.sleep(waitTimeInterval)
+        waitTimeInterval *= 2  // if you have waited, then double wait time 
for next round
       }
       try {
         result = Some(body)
@@ -284,7 +282,8 @@ class KinesisSequenceRangeIterator(
     result.getOrElse {
       if (isTimedOut) {
         throw new SparkException(
-          s"Timed out after $retryTimeoutMs ms while $message, last exception: 
", lastError)
+          s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while " +
+          s"$message, last exception: ", lastError)
       } else {
         throw new SparkException(
           s"Gave up after $retryCount retries while $message, last exception: 
", lastError)
@@ -292,9 +291,3 @@ class KinesisSequenceRangeIterator(
     }
   }
 }
-
-private[streaming]
-object KinesisSequenceRangeIterator {
-  val MAX_RETRIES = 3
-  val MIN_RETRY_WAIT_TIME_MS = 100
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7076ab40/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
index 7755341..decfb6b 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.model.Record
+import KinesisReadConfigurations._
 
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.rdd.RDD
@@ -60,12 +61,13 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
       val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
       logDebug(s"Creating KinesisBackedBlockRDD for $time with 
${seqNumRanges.length} " +
           s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
+
       new KinesisBackedBlockRDD(
         context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
         isBlockIdValid = isBlockIdValid,
-        retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
         messageHandler = messageHandler,
-        kinesisCreds = kinesisCreds)
+        kinesisCreds = kinesisCreds,
+        kinesisReadConfigs = KinesisReadConfigurations(ssc))
     } else {
       logWarning("Kinesis sequence number information was not present with 
some block metadata," +
         " it may not be possible to recover from failures")

http://git-wip-us.apache.org/repos/asf/spark/blob/7076ab40/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
new file mode 100644
index 0000000..871071e
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [[KinesisBackedBlockRDD]].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to Kinesis. 
Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ *                         Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
+ *                         Defaults to batch duration provided for streaming,
+ *                         else uses 10000 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+    maxRetries: Int,
+    retryWaitTimeMs: Long,
+    retryTimeoutMs: Long)
+
+private[kinesis] object KinesisReadConfigurations {
+  def apply(): KinesisReadConfigurations = {
+    KinesisReadConfigurations(maxRetries = DEFAULT_MAX_RETRIES,
+      retryWaitTimeMs = JavaUtils.timeStringAsMs(DEFAULT_RETRY_WAIT_TIME),
+      retryTimeoutMs = DEFAULT_RETRY_TIMEOUT)
+  }
+
+  def apply(ssc: StreamingContext): KinesisReadConfigurations = {
+    KinesisReadConfigurations(
+      maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, 
DEFAULT_MAX_RETRIES),
+      retryWaitTimeMs = JavaUtils.timeStringAsMs(
+        ssc.sc.getConf.get(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)),
+      retryTimeoutMs = ssc.graph.batchDuration.milliseconds)
+  }
+
+  /**
+   * SparkConf key for configuring the maximum number of retries used when 
attempting a Kinesis
+   * request.
+   */
+  val RETRY_MAX_ATTEMPTS_KEY = "spark.streaming.kinesis.retry.maxAttempts"
+
+  /**
+   * SparkConf key for configuring the wait time to use before retrying a 
Kinesis attempt.
+   */
+  val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime"
+
+  /**
+   * Default value for the RETRY_MAX_ATTEMPTS_KEY
+   */
+  val DEFAULT_MAX_RETRIES = 3
+
+  /**
+   * Default value for the RETRY_WAIT_TIME_KEY
+   */
+  val DEFAULT_RETRY_WAIT_TIME = "100ms"
+
+  /**
+   * Default value for the retry timeout
+   */
+  val DEFAULT_RETRY_TIMEOUT = 10000
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7076ab40/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 341a689..7e5bda9 100644
--- 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._
 import org.apache.spark.streaming.kinesis.KinesisTestUtils._
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
 import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
@@ -136,7 +137,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
     val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
     assert(kinesisRDD.regionName === dummyRegionName)
     assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
-    assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
+    assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === 
batchDuration.milliseconds)
     assert(kinesisRDD.kinesisCreds === BasicCredentials(
       awsAccessKeyId = dummyAWSAccessKey,
       awsSecretKey = dummyAWSSecretKey))
@@ -234,6 +235,52 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
     ssc.stop(stopSparkContext = false)
   }
 
+  test("Kinesis read with custom configurations") {
+    try {
+      ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms")
+      ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5")
+
+      val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
+      .checkpointAppName(appName)
+      .streamName("dummyStream")
+      .endpointUrl(dummyEndpointUrl)
+      .regionName(dummyRegionName)
+      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .checkpointInterval(Seconds(10))
+      .storageLevel(StorageLevel.MEMORY_ONLY)
+      .build()
+      .asInstanceOf[KinesisInputDStream[Array[Byte]]]
+
+      val time = Time(1000)
+      // Generate block info data for testing
+      val seqNumRanges1 = SequenceNumberRanges(
+        SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
+      val blockId1 = StreamBlockId(kinesisStream.id, 123)
+      val blockInfo1 = ReceivedBlockInfo(
+        0, None, Some(seqNumRanges1), new 
BlockManagerBasedStoreResult(blockId1, None))
+
+      val seqNumRanges2 = SequenceNumberRanges(
+        SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
+      val blockId2 = StreamBlockId(kinesisStream.id, 345)
+      val blockInfo2 = ReceivedBlockInfo(
+        0, None, Some(seqNumRanges2), new 
BlockManagerBasedStoreResult(blockId2, None))
+
+      // Verify that the generated KinesisBackedBlockRDD has the all the right 
information
+      val blockInfos = Seq(blockInfo1, blockInfo2)
+
+      val kinesisRDD =
+        kinesisStream.createBlockRDD(time, 
blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]]
+
+      assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000)
+      assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5)
+      assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === 
batchDuration.milliseconds)
+    } finally {
+      ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY)
+      ssc.sc.conf.remove(RETRY_MAX_ATTEMPTS_KEY)
+      ssc.stop(stopSparkContext = false)
+    }
+  }
+
   testIfEnabled("split and merge shards in a stream") {
     // Since this test tries to split and merge shards in a stream, we create 
another
     // temporary stream and then remove it when finished.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to