Repository: spark
Updated Branches:
  refs/heads/master be5fc6ef7 -> b08b50045


[SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming 
+ kinesis

## What changes were proposed in this pull request?
This pr is to make input rates in timeline more flat for spark streaming + 
kinesis.
Since kinesis workers fetch records and push them into block generators in 
bulk, timeline in web UI has many spikes when `maxRates` applied (See a 
Figure.1 below). This fix splits fetched input records into multiple 
`adRecords` calls.

Figure.1 Apply `maxRates=500` in vanilla Spark
<img width="1084" alt="apply_limit in_vanilla_spark" 
src="https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png";>

Figure.2 Apply `maxRates=500` in Spark with my patch
<img width="1056" alt="apply_limit in_spark_with_my_patch" 
src="https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png";>

## How was this patch tested?
Add tests to check to split input records into multiple `addRecords` calls.

Author: Takeshi YAMAMURO <linguin....@gmail.com>

Closes #16114 from maropu/SPARK-18620.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b08b5004
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b08b5004
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b08b5004

Branch: refs/heads/master
Commit: b08b5004563b28d10b07b70946a9f72408ed228a
Parents: be5fc6e
Author: Takeshi YAMAMURO <linguin....@gmail.com>
Authored: Sat Dec 10 05:32:04 2016 +0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Dec 10 05:32:04 2016 +0800

----------------------------------------------------------------------
 .../spark/streaming/kinesis/KinesisReceiver.scala  |  6 ++++++
 .../streaming/kinesis/KinesisRecordProcessor.scala | 14 ++++++++++++--
 .../streaming/kinesis/KinesisReceiverSuite.scala   | 17 +++++++++++++++++
 3 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 858368d..393e56a 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
     }
   }
 
+  /** Return the current rate limit defined in [[BlockGenerator]]. */
+  private[kinesis] def getCurrentLimit: Int = {
+    assert(blockGenerator != null)
+    math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt
+  }
+
   /** Get the latest sequence number for the given shard that can be 
checkpointed through KCL */
   private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): 
Option[String] = {
     Option(shardIdToLatestStoredSeqNum.get(shardId))

http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index a0ccd08..73ccc4a 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: 
KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
     if (!receiver.isStopped()) {
       try {
-        receiver.addRecords(shardId, batch)
-        logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+        // Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+        // control the number of aggregated records to be fetched even if we 
set `MaxRecords`
+        // in `KinesisClientLibConfiguration`. For example, if we set 10 to 
the number of max
+        // records in a worker and a producer aggregates two records into one 
message, the worker
+        // possibly 20 records every callback function called.
+        val maxRecords = receiver.getCurrentLimit
+        for (start <- 0 until batch.size by maxRecords) {
+          val miniBatch = batch.subList(start, math.min(start + maxRecords, 
batch.size))
+          receiver.addRecords(shardId, miniBatch)
+          logDebug(s"Stored: Worker $workerId stored ${miniBatch.size} records 
" +
+            s"for shardId $shardId")
+        }
         receiver.setCheckpointer(shardId, checkpointer)
       } catch {
         case NonFatal(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index deac909..800502a 100644
--- 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with 
Matchers with BeforeAndAft
 
   test("process records including store and set checkpointer") {
     when(receiverMock.isStopped()).thenReturn(false)
+    when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
 
     val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
     recordProcessor.initialize(shardId)
@@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with 
Matchers with BeforeAndAft
     verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
   }
 
+  test("split into multiple processes if a limitation is set") {
+    when(receiverMock.isStopped()).thenReturn(false)
+    when(receiverMock.getCurrentLimit).thenReturn(1)
+
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+    recordProcessor.initialize(shardId)
+    recordProcessor.processRecords(batch, checkpointerMock)
+
+    verify(receiverMock, times(1)).isStopped()
+    verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1))
+    verify(receiverMock, times(1)).addRecords(shardId, batch.subList(1, 2))
+    verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
+  }
+
   test("shouldn't store and update checkpointer when receiver is stopped") {
     when(receiverMock.isStopped()).thenReturn(true)
+    when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
 
     val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
     recordProcessor.processRecords(batch, checkpointerMock)
@@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with 
Matchers with BeforeAndAft
 
   test("shouldn't update checkpointer when exception occurs during store") {
     when(receiverMock.isStopped()).thenReturn(false)
+    when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
     when(
       receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
     ).thenThrow(new RuntimeException())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to