Repository: spark Updated Branches: refs/heads/master be5fc6ef7 -> b08b50045
[SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming + kinesis ## What changes were proposed in this pull request? This pr is to make input rates in timeline more flat for spark streaming + kinesis. Since kinesis workers fetch records and push them into block generators in bulk, timeline in web UI has many spikes when `maxRates` applied (See a Figure.1 below). This fix splits fetched input records into multiple `adRecords` calls. Figure.1 Apply `maxRates=500` in vanilla Spark <img width="1084" alt="apply_limit in_vanilla_spark" src="https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png"> Figure.2 Apply `maxRates=500` in Spark with my patch <img width="1056" alt="apply_limit in_spark_with_my_patch" src="https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png"> ## How was this patch tested? Add tests to check to split input records into multiple `addRecords` calls. Author: Takeshi YAMAMURO <linguin....@gmail.com> Closes #16114 from maropu/SPARK-18620. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b08b5004 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b08b5004 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b08b5004 Branch: refs/heads/master Commit: b08b5004563b28d10b07b70946a9f72408ed228a Parents: be5fc6e Author: Takeshi YAMAMURO <linguin....@gmail.com> Authored: Sat Dec 10 05:32:04 2016 +0800 Committer: Sean Owen <so...@cloudera.com> Committed: Sat Dec 10 05:32:04 2016 +0800 ---------------------------------------------------------------------- .../spark/streaming/kinesis/KinesisReceiver.scala | 6 ++++++ .../streaming/kinesis/KinesisRecordProcessor.scala | 14 ++++++++++++-- .../streaming/kinesis/KinesisReceiverSuite.scala | 17 +++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 858368d..393e56a 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T]( } } + /** Return the current rate limit defined in [[BlockGenerator]]. */ + private[kinesis] def getCurrentLimit: Int = { + assert(blockGenerator != null) + math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt + } + /** Get the latest sequence number for the given shard that can be checkpointed through KCL */ private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { Option(shardIdToLatestStoredSeqNum.get(shardId)) http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index a0ccd08..73ccc4a 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { if (!receiver.isStopped()) { try { - receiver.addRecords(shardId, batch) - logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") + // Limit the number of processed records from Kinesis stream. This is because the KCL cannot + // control the number of aggregated records to be fetched even if we set `MaxRecords` + // in `KinesisClientLibConfiguration`. For example, if we set 10 to the number of max + // records in a worker and a producer aggregates two records into one message, the worker + // possibly 20 records every callback function called. + val maxRecords = receiver.getCurrentLimit + for (start <- 0 until batch.size by maxRecords) { + val miniBatch = batch.subList(start, math.min(start + maxRecords, batch.size)) + receiver.addRecords(shardId, miniBatch) + logDebug(s"Stored: Worker $workerId stored ${miniBatch.size} records " + + s"for shardId $shardId") + } receiver.setCheckpointer(shardId, checkpointer) } catch { case NonFatal(e) => http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index deac909..800502a 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft test("process records including store and set checkpointer") { when(receiverMock.isStopped()).thenReturn(false) + when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue) val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) recordProcessor.initialize(shardId) @@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock) } + test("split into multiple processes if a limitation is set") { + when(receiverMock.isStopped()).thenReturn(false) + when(receiverMock.getCurrentLimit).thenReturn(1) + + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) + recordProcessor.initialize(shardId) + recordProcessor.processRecords(batch, checkpointerMock) + + verify(receiverMock, times(1)).isStopped() + verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1)) + verify(receiverMock, times(1)).addRecords(shardId, batch.subList(1, 2)) + verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock) + } + test("shouldn't store and update checkpointer when receiver is stopped") { when(receiverMock.isStopped()).thenReturn(true) + when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue) val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) recordProcessor.processRecords(batch, checkpointerMock) @@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft test("shouldn't update checkpointer when exception occurs during store") { when(receiverMock.isStopped()).thenReturn(false) + when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue) when( receiverMock.addRecords(anyString, anyListOf(classOf[Record])) ).thenThrow(new RuntimeException()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org