Repository: spark
Updated Branches:
  refs/heads/master ed249db9c -> 6d7bc5af4


[SPARK-25267][SQL][TEST] Disable ConvertToLocalRelation in the test cases of 
sql/core and sql/hive

## What changes were proposed in this pull request?
In SharedSparkSession and TestHive, we need to disable the rule 
ConvertToLocalRelation for better test case coverage.
## How was this patch tested?
Identify the failures after excluding "ConvertToLocalRelation" rule.

Closes #22270 from dilipbiswal/SPARK-25267-final.

Authored-by: Dilip Biswal <dbis...@us.ibm.com>
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d7bc5af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d7bc5af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d7bc5af

Branch: refs/heads/master
Commit: 6d7bc5af454341f6d9bfc1e903148ad7ba8de6f9
Parents: ed249db
Author: Dilip Biswal <dbis...@us.ibm.com>
Authored: Thu Sep 6 23:35:02 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Sep 6 23:35:02 2018 -0700

----------------------------------------------------------------------
 .../test/scala/org/apache/spark/ml/util/MLTest.scala  | 10 +++++++++-
 .../resources/sql-tests/inputs/group-by-ordinal.sql   |  4 +++-
 .../sql-tests/results/group-by-ordinal.sql.out        |  4 +++-
 .../apache/spark/sql/DataFrameAggregateSuite.scala    |  2 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala    | 14 ++++++++------
 .../scala/org/apache/spark/sql/DataFrameSuite.scala   |  5 ++---
 .../apache/spark/sql/test/SharedSparkSession.scala    |  6 ++++++
 .../org/apache/spark/sql/hive/test/TestHive.scala     |  8 +++++++-
 8 files changed, 39 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala 
b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
index 76d41f9..acac171 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
@@ -21,12 +21,13 @@ import java.io.File
 
 import org.scalatest.Suite
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{DebugFilesystem, SparkConf, SparkContext}
 import org.apache.spark.ml.{PredictionModel, Transformer}
 import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.test.TestSparkSession
 import org.apache.spark.util.Utils
@@ -36,6 +37,13 @@ trait MLTest extends StreamTest with TempDirectory { self: 
Suite =>
   @transient var sc: SparkContext = _
   @transient var checkpointDir: String = _
 
+  protected override def sparkConf = {
+    new SparkConf()
+      .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+  }
+
   protected override def createSparkSession: TestSparkSession = {
     new TestSparkSession(new SparkContext("local[2]", "MLlibUnitTest", 
sparkConf))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql 
b/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql
index 928f766..3144833 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql
@@ -38,7 +38,9 @@ select a, b, sum(b) from data group by 3;
 select a, b, sum(b) + 2 from data group by 3;
 
 -- negative case: nondeterministic expression
-select a, rand(0), sum(b) from data group by a, 2;
+select a, rand(0), sum(b)
+from 
+(select /*+ REPARTITION(1) */ a, b from data) group by a, 2;
 
 -- negative case: star
 select * from data group by a, b, 1;

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
index 9ecbe19..cf5add6 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
@@ -135,7 +135,9 @@ aggregate functions are not allowed in GROUP BY, but found 
(sum(CAST(data.`b` AS
 
 
 -- !query 13
-select a, rand(0), sum(b) from data group by a, 2
+select a, rand(0), sum(b)
+from 
+(select /*+ REPARTITION(1) */ a, b from data) group by a, 2
 -- !query 13 schema
 struct<a:int,rand(0):double,sum(b):bigint>
 -- !query 13 output

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 85b3ca1..ed110f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -558,7 +558,7 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
 
   test("SPARK-18004 limit + aggregates") {
     withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
-      val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
+      val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", 
"value").repartition(1)
       val limit2Df = df.limit(2)
       checkAnswer(
         limit2Df.groupBy("id").count().select($"id"),

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 156e543..4b83e51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -85,14 +85,16 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
     }
 
     val df5 = Seq((Seq("a", null), Seq(1, 2))).toDF("k", "v")
-    intercept[RuntimeException] {
+    val msg1 = intercept[Exception] {
       df5.select(map_from_arrays($"k", $"v")).collect
-    }
+    }.getMessage
+    assert(msg1.contains("Cannot use null as map key!"))
 
     val df6 = Seq((Seq(1, 2), Seq("a"))).toDF("k", "v")
-    intercept[RuntimeException] {
+    val msg2 = intercept[Exception] {
       df6.select(map_from_arrays($"k", $"v")).collect
-    }
+    }.getMessage
+    assert(msg2.contains("The given two arrays should have the same length"))
   }
 
   test("struct with column name") {
@@ -2377,7 +2379,7 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
     assert(ex2.getMessage.contains(
       "The number of lambda function arguments '3' does not match"))
 
-    val ex3 = intercept[RuntimeException] {
+    val ex3 = intercept[Exception] {
       dfExample1.selectExpr("transform_keys(i, (k, v) -> v)").show()
     }
     assert(ex3.getMessage.contains("Cannot use null as map key!"))
@@ -2697,7 +2699,7 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
   test("SPARK-24734: Fix containsNull of Concat for array type") {
     val df = Seq((Seq(1), Seq[Integer](null), Seq("a", "b"))).toDF("k1", "k2", 
"v")
-    val ex = intercept[RuntimeException] {
+    val ex = intercept[Exception] {
       df.select(map_from_arrays(concat($"k1", $"k2"), $"v")).show()
     }
     assert(ex.getMessage.contains("Cannot use null as map key"))

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d43fcf3..45b17b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.test.{ExamplePoint, 
ExamplePointUDT, SharedSQLContex
 import org.apache.spark.sql.test.SQLTestData.{NullInts, NullStrings, TestData2}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
+import org.apache.spark.util.random.XORShiftRandom
 
 class DataFrameSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -1729,10 +1730,8 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-9083: sort with non-deterministic expressions") {
-    import org.apache.spark.util.random.XORShiftRandom
-
     val seed = 33
-    val df = (1 to 100).map(Tuple1.apply).toDF("i")
+    val df = (1 to 100).map(Tuple1.apply).toDF("i").repartition(1)
     val random = new XORShiftRandom(seed)
     val expected = (1 to 100).map(_ -> 
random.nextDouble()).sortBy(_._2).map(_._1)
     val actual = df.sort(rand(seed)).collect().map(_.getInt(0))

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index 8968dbf..e7e0ce6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.internal.SQLConf
 
 /**
@@ -39,6 +40,11 @@ trait SharedSparkSession
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
       .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+      // Disable ConvertToLocalRelation for better test coverage. Test cases 
built on
+      // LocalRelation will exercise the optimization rules better by 
disabling it as
+      // this rule may potentially block testing of other optimization rules 
such as
+      // ConstantPropagation etc.
+      .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7bc5af/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ee3f99a..71f15a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, 
ExternalCatalogWithListener}
+import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
OneRowRelation}
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.command.CacheTableCommand
@@ -59,7 +60,12 @@ object TestHive
         .set("spark.sql.warehouse.dir", 
TestHiveContext.makeWarehouseDir().toURI.getPath)
         // SPARK-8910
         .set("spark.ui.enabled", "false")
-        .set("spark.unsafe.exceptionOnMemoryLeak", "true")))
+        .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+        // Disable ConvertToLocalRelation for better test coverage. Test cases 
built on
+        // LocalRelation will exercise the optimization rules better by 
disabling it as
+        // this rule may potentially block testing of other optimization rules 
such as
+        // ConstantPropagation etc.
+        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)))
 
 
 case class TestHiveVersion(hiveClient: HiveClient)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to