Repository: spark Updated Branches: refs/heads/branch-1.4 1a134e5d4 -> b928db4fe
[SPARK-7838] [STREAMING] Set scope for kinesis stream Author: Tathagata Das <[email protected]> Closes #6369 from tdas/SPARK-7838 and squashes the following commits: 87d1c7f [Tathagata Das] Addressed comment 37775d8 [Tathagata Das] set scope for kinesis stream (cherry picked from commit baa89838cca96fa091c9e5ce62be01e1a265d820) Signed-off-by: Andrew Or <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b928db4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b928db4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b928db4f Branch: refs/heads/branch-1.4 Commit: b928db4fe3892c8da07be0bf4979aecb11c5feab Parents: 1a134e5 Author: Tathagata Das <[email protected]> Authored: Fri May 22 23:05:54 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Fri May 22 23:06:01 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/streaming/kinesis/KinesisUtils.scala | 9 ++++++--- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b928db4f/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index b114bcf..2531aeb 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -63,9 +63,12 @@ object KinesisUtils { checkpointInterval: Duration, storageLevel: StorageLevel ): ReceiverInputDStream[Array[Byte]] = { - ssc.receiverStream( - new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, checkpointInterval, storageLevel, None)) + // Setting scope to override receiver stream's scope of "receiver stream" + ssc.withNamedScope("kinesis stream") { + ssc.receiverStream( + new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName), + initialPositionInStream, checkpointInterval, storageLevel, None)) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/b928db4f/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7b77d44..5e58ed7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -262,7 +262,7 @@ class StreamingContext private[streaming] ( * * Note: Return statements are NOT allowed in the given body. */ - private def withNamedScope[U](name: String)(body: => U): U = { + private[streaming] def withNamedScope[U](name: String)(body: => U): U = { RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
