Repository: spark Updated Branches: refs/heads/master 96a59109a -> 22a9d064e
[SPARK-14914][CORE] Fix Resource not closed after using, for unit tests and example ## What changes were proposed in this pull request? This is a follow-up work of #15618. Close file source; For any newly created streaming context outside the withContext, explicitly close the context. ## How was this patch tested? Existing unit tests. Author: [email protected] <[email protected]> Closes #15818 from wangmiao1981/rtest. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22a9d064 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22a9d064 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22a9d064 Branch: refs/heads/master Commit: 22a9d064e95af71f757113f1869f754cc862df35 Parents: 96a5910 Author: [email protected] <[email protected]> Authored: Thu Nov 10 10:54:36 2016 +0000 Committer: Sean Owen <[email protected]> Committed: Thu Nov 10 10:54:36 2016 +0000 ---------------------------------------------------------------------- .../spark/util/MutableURLClassLoaderSuite.scala | 14 ++++++++++++++ .../scala/org/apache/spark/examples/LocalFileLR.scala | 4 +++- .../streaming/kafka010/DirectKafkaStreamSuite.scala | 2 ++ .../streaming/kafka/DirectKafkaStreamSuite.scala | 2 ++ .../spark/streaming/kafka/KafkaStreamSuite.scala | 1 + .../hive/thriftserver/HiveThriftServer2Suites.scala | 7 ++++++- .../org/apache/spark/streaming/CheckpointSuite.scala | 1 + .../spark/streaming/scheduler/JobGeneratorSuite.scala | 1 + 8 files changed, 30 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala index 8b53d4f..f6ac89f 100644 --- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala @@ -51,6 +51,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { assert(fakeClassVersion === "1") val fakeClass2 = classLoader.loadClass("FakeClass2").newInstance() assert(fakeClass.getClass === fakeClass2.getClass) + classLoader.close() + parentLoader.close() } test("parent first") { @@ -61,6 +63,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { assert(fakeClassVersion === "2") val fakeClass2 = classLoader.loadClass("FakeClass1").newInstance() assert(fakeClass.getClass === fakeClass2.getClass) + classLoader.close() + parentLoader.close() } test("child first can fall back") { @@ -69,6 +73,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { val fakeClass = classLoader.loadClass("FakeClass3").newInstance() val fakeClassVersion = fakeClass.toString assert(fakeClassVersion === "2") + classLoader.close() + parentLoader.close() } test("child first can fail") { @@ -77,6 +83,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { intercept[java.lang.ClassNotFoundException] { classLoader.loadClass("FakeClassDoesNotExist").newInstance() } + classLoader.close() + parentLoader.close() } test("default JDK classloader get resources") { @@ -84,6 +92,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { val classLoader = new URLClassLoader(fileUrlsChild, parentLoader) assert(classLoader.getResources("resource1").asScala.size === 2) assert(classLoader.getResources("resource2").asScala.size === 1) + classLoader.close() + parentLoader.close() } test("parent first get resources") { @@ -91,6 +101,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { val classLoader = new MutableURLClassLoader(fileUrlsChild, parentLoader) assert(classLoader.getResources("resource1").asScala.size === 2) assert(classLoader.getResources("resource2").asScala.size === 1) + classLoader.close() + parentLoader.close() } test("child first get resources") { @@ -103,6 +115,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly ("resource1Contents-child", "resource1Contents-parent") + classLoader.close() + parentLoader.close() } http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index 3d02ce0..a897cad 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -51,7 +51,8 @@ object LocalFileLR { showWarning() - val lines = scala.io.Source.fromFile(args(0)).getLines().toArray + val fileSrc = scala.io.Source.fromFile(args(0)) + val lines = fileSrc.getLines().toArray val points = lines.map(parsePoint _) val ITERATIONS = args(1).toInt @@ -69,6 +70,7 @@ object LocalFileLR { w -= gradient } + fileSrc.close() println("Final w: " + w) } } http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 02aec43..c81836d 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -272,6 +272,7 @@ class DirectKafkaStreamSuite collectedData.contains("b") } assert(!collectedData.contains("a")) + ssc.stop() } @@ -324,6 +325,7 @@ class DirectKafkaStreamSuite collectedData.contains("b") } assert(!collectedData.contains("a")) + ssc.stop() } // Test to verify the offset ranges can be recovered from the checkpoints http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index ab1c505..8a747a5 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -184,6 +184,7 @@ class DirectKafkaStreamSuite collectedData.contains("b") } assert(!collectedData.contains("a")) + ssc.stop() } @@ -230,6 +231,7 @@ class DirectKafkaStreamSuite collectedData.contains("b") } assert(!collectedData.contains("a")) + ssc.stop() } // Test to verify the offset ranges can be recovered from the checkpoints http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 6a35ac1..426cd83 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -80,5 +80,6 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { assert(result.synchronized { sent === result }) } + ssc.stop() } } http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 8f2c4fa..5d20ec9 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -609,7 +609,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { test("SPARK-11043 check operation log root directory") { val expectedLine = "Operation log root directory is created: " + operationLogPath.getAbsoluteFile - assert(Source.fromFile(logPath).getLines().exists(_.contains(expectedLine))) + val bufferSrc = Source.fromFile(logPath) + Utils.tryWithSafeFinally { + assert(bufferSrc.getLines().exists(_.contains(expectedLine))) + } { + bufferSrc.close() + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 41f16bf..a1e9d1e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -815,6 +815,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val ois = new ObjectInputStreamWithLoader( new ByteArrayInputStream(bos.toByteArray), loader) assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;") + ois.close() } test("SPARK-11267: the race condition of two checkpoints in a batch") { http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala index a2dbae1..5f7f7fa 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala @@ -123,6 +123,7 @@ class JobGeneratorSuite extends TestSuiteBase { assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already deleted") assert(batchCounter.getNumCompletedBatches < longBatchNumber) waitLatch.countDown() + ssc.stop() } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
