Repository: spark
Updated Branches:
  refs/heads/branch-2.4 67f2cb6e0 -> 76514a015


[SPARK-25456][SQL][TEST] Fix PythonForeachWriterSuite

PythonForeachWriterSuite was failing because RowQueue now needs to have a 
handle on a SparkEnv with a SerializerManager, so added a mock env with a 
serializer manager.

Also fixed a typo in the `finally` that was hiding the real exception.

Tested PythonForeachWriterSuite locally, full tests via jenkins.

Closes #22452 from squito/SPARK-25456.

Authored-by: Imran Rashid <iras...@cloudera.com>
Signed-off-by: Imran Rashid <iras...@cloudera.com>
(cherry picked from commit a6f37b0742d87d5c8ee3e134999d665e5719e822)
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76514a01
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76514a01
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76514a01

Branch: refs/heads/branch-2.4
Commit: 76514a015168de8d8b54b3abf6b835050eefd8c2
Parents: 67f2cb6
Author: Imran Rashid <iras...@cloudera.com>
Authored: Tue Sep 18 16:33:37 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Sep 18 16:33:49 2018 -0500

----------------------------------------------------------------------
 .../execution/python/PythonForeachWriterSuite.scala   | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76514a01/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
index 07e6034..d02014c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
@@ -19,17 +19,20 @@ package org.apache.spark.sql.execution.python
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.mockito.Mockito.when
 import org.scalatest.concurrent.Eventually
+import org.scalatest.mockito.MockitoSugar
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
 import 
org.apache.spark.sql.execution.python.PythonForeachWriter.UnsafeRowBuffer
 import org.apache.spark.sql.types.{DataType, IntegerType}
 import org.apache.spark.util.Utils
 
-class PythonForeachWriterSuite extends SparkFunSuite with Eventually {
+class PythonForeachWriterSuite extends SparkFunSuite with Eventually with 
MockitoSugar {
 
   testWithBuffer("UnsafeRowBuffer: iterator blocks when no data is available") 
{ b =>
     b.assertIteratorBlocked()
@@ -75,7 +78,7 @@ class PythonForeachWriterSuite extends SparkFunSuite with 
Eventually {
         tester = new BufferTester(memBytes, sleepPerRowReadMs)
         f(tester)
       } finally {
-        if (tester == null) tester.close()
+        if (tester != null) tester.close()
       }
     }
   }
@@ -83,7 +86,12 @@ class PythonForeachWriterSuite extends SparkFunSuite with 
Eventually {
 
   class BufferTester(memBytes: Long, sleepPerRowReadMs: Int) {
     private val buffer = {
-      val mem = new TestMemoryManager(new SparkConf())
+      val mockEnv = mock[SparkEnv]
+      val conf = new SparkConf()
+      val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf, None)
+      when(mockEnv.serializerManager).thenReturn(serializerManager)
+      SparkEnv.set(mockEnv)
+      val mem = new TestMemoryManager(conf)
       mem.limit(memBytes)
       val taskM = new TaskMemoryManager(mem, 0)
       new UnsafeRowBuffer(taskM, Utils.createTempDir(), 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to