Repository: spark Updated Branches: refs/heads/branch-1.4 01d402298 -> acc877a98
[SPARK-7318] [STREAMING] DStream cleans objects that are not closures I added a check in `ClosureCleaner#clean` to fail fast if this is detected in the future. tdas Author: Andrew Or <[email protected]> Closes #5860 from andrewor14/streaming-closure-cleaner and squashes the following commits: 8e971d7 [Andrew Or] Do not throw exception if object to clean is not closure 5ee4e25 [Andrew Or] Fix tests eed3390 [Andrew Or] Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner 67eeff4 [Andrew Or] Add tests a4fa768 [Andrew Or] Clean the closure, not the RDD (cherry picked from commit 57e9f29e17d97ed9d0f110fb2ce5a075b854a841) Signed-off-by: Andrew Or <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acc877a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acc877a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acc877a9 Branch: refs/heads/branch-1.4 Commit: acc877a989207789ad4bfec3bae43f484486f7a2 Parents: 01d4022 Author: Andrew Or <[email protected]> Authored: Tue May 5 09:37:49 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Tue May 5 09:37:56 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 5 +++++ .../main/scala/org/apache/spark/streaming/dstream/DStream.scala | 3 ++- .../test/scala/org/apache/spark/streaming/ReceiverSuite.scala | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/acc877a9/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 19fe6cb..6fe32e4 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -179,6 +179,11 @@ private[spark] object ClosureCleaner extends Logging { cleanTransitively: Boolean, accessedFields: Map[Class[_], Set[String]]): Unit = { + if (!isClosure(func.getClass)) { + logWarning("Expected a closure; got " + func.getClass.getName) + return + } + // TODO: clean all inner closures first. This requires us to find the inner objects. // TODO: cache outerClasses / innerClasses / accessedFields http://git-wip-us.apache.org/repos/asf/spark/blob/acc877a9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 83d41f5..f1f8a70 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] ( // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) + val cleanedF = context.sparkContext.clean(transformFunc, false) + transform((r: RDD[T], t: Time) => cleanedF(r)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/acc877a9/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 393a360..5d71276 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { } withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc => - val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) - val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) + val receiver1 = new FakeReceiver(sendData = true) + val receiver2 = new FakeReceiver(sendData = true) val receiverStream1 = ssc.receiverStream(receiver1) val receiverStream2 = ssc.receiverStream(receiver2) receiverStream1.register() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
