This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 515dfc166a9 [SPARK-44496][SQL][CONNECT] Move Interfaces needed by SCSC 
to sql/api
515dfc166a9 is described below

commit 515dfc166a95ecc8decce0f0cd99e06fe395f94f
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Sat Jul 22 19:53:04 2023 -0400

    [SPARK-44496][SQL][CONNECT] Move Interfaces needed by SCSC to sql/api
    
    ### What changes were proposed in this pull request?
    
    This PR moves some interfaces and utils that are needed by scala client to 
sql/api.
    
    ### Why are the changes needed?
    
    So that scala client does not need to depend on Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    Closes #42092 from amaliujia/row_coder.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../spark/api/java/function/CoGroupFunction.java   |  0
 .../api/java/function/DoubleFlatMapFunction.java   |  0
 .../spark/api/java/function/DoubleFunction.java    |  0
 .../spark/api/java/function/FilterFunction.java    |  0
 .../spark/api/java/function/FlatMapFunction.java   |  0
 .../spark/api/java/function/FlatMapFunction2.java  |  0
 .../api/java/function/FlatMapGroupsFunction.java   |  0
 .../spark/api/java/function/ForeachFunction.java   |  0
 .../java/function/ForeachPartitionFunction.java    |  0
 .../apache/spark/api/java/function/Function.java   |  0
 .../apache/spark/api/java/function/Function0.java  |  0
 .../apache/spark/api/java/function/Function2.java  |  0
 .../apache/spark/api/java/function/Function3.java  |  0
 .../apache/spark/api/java/function/Function4.java  |  0
 .../spark/api/java/function/MapFunction.java       |  0
 .../spark/api/java/function/MapGroupsFunction.java |  0
 .../api/java/function/MapPartitionsFunction.java   |  0
 .../api/java/function/PairFlatMapFunction.java     |  0
 .../spark/api/java/function/PairFunction.java      |  0
 .../spark/api/java/function/ReduceFunction.java    |  0
 .../spark/api/java/function/VoidFunction.java      |  0
 .../spark/api/java/function/VoidFunction2.java     |  0
 .../spark/api/java/function/package-info.java      |  0
 .../apache/spark/api/java/function/package.scala   |  0
 .../org/apache/spark/util/SparkClassUtils.scala    |  4 ++
 connector/connect/client/jvm/pom.xml               |  5 ++
 .../main/scala/org/apache/spark/sql/Column.scala   |  4 +-
 .../org/apache/spark/sql/DataFrameReader.scala     |  8 +--
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 12 ++---
 .../main/scala/org/apache/spark/util/Utils.scala   |  1 -
 project/MimaExcludes.scala                         | 29 +++++++++++
 .../main/scala/org/apache/spark/sql/Encoder.scala  |  0
 .../src/main/scala/org/apache/spark/sql/Row.scala  |  6 +--
 .../scala/org/apache/spark/sql/SqlApiConf.scala    |  2 +
 .../sql/catalyst/encoders/AgnosticEncoder.scala    |  4 +-
 .../sql/catalyst/expressions/GenericRow.scala      | 25 ++++++---
 .../sql/catalyst/expressions/OrderUtils.scala      | 25 +++++----
 .../sql/catalyst/util/SparkCharVarcharUtils.scala  | 60 ++++++++++++++++++++++
 .../apache/spark/sql/errors/DataTypeErrors.scala   | 28 ++++++++++
 .../spark/sql/catalyst/expressions/ordering.scala  |  9 +---
 .../spark/sql/catalyst/expressions/rows.scala      | 21 --------
 .../spark/sql/catalyst/util/CharVarcharUtils.scala | 38 +-------------
 .../spark/sql/errors/QueryCompilationErrors.scala  |  8 +--
 .../spark/sql/errors/QueryExecutionErrors.scala    | 10 +---
 .../apache/spark/sql/CharVarcharTestSuite.scala    | 28 +++++-----
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      |  4 +-
 .../sql/test/DataFrameReaderWriterSuite.scala      | 12 ++---
 47 files changed, 203 insertions(+), 140 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
 
b/common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
similarity index 100%
copy from 
core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
copy to 
common/utils/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
 
b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
 
b/common/utils/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/Function.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function0.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function0.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function0.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/Function0.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function2.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function2.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function2.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/Function2.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function3.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function3.java
similarity index 100%
copy from core/src/main/java/org/apache/spark/api/java/function/Function3.java
copy to 
common/utils/src/main/java/org/apache/spark/api/java/function/Function3.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function4.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function4.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function4.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/Function4.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/MapFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/MapFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
 
b/common/utils/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
 
b/common/utils/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/PairFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/PairFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/package-info.java 
b/common/utils/src/main/java/org/apache/spark/api/java/function/package-info.java
similarity index 100%
rename from 
core/src/main/java/org/apache/spark/api/java/function/package-info.java
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/package-info.java
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/package.scala 
b/common/utils/src/main/java/org/apache/spark/api/java/function/package.scala
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/package.scala
rename to 
common/utils/src/main/java/org/apache/spark/api/java/function/package.scala
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
index 7401e376241..a237869aef3 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
@@ -16,9 +16,13 @@
  */
 package org.apache.spark.util
 
+import java.util.Random
+
 import scala.util.Try
 
 trait SparkClassUtils {
+  val random = new Random()
+
   def getSparkClassLoader: ClassLoader = getClass.getClassLoader
 
   def getContextOrSparkClassLoader: ClassLoader =
diff --git a/connector/connect/client/jvm/pom.xml 
b/connector/connect/client/jvm/pom.xml
index 60ed0f3ba46..6605496a165 100644
--- a/connector/connect/client/jvm/pom.xml
+++ b/connector/connect/client/jvm/pom.xml
@@ -64,6 +64,11 @@
       <artifactId>spark-common-utils_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sketch_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
index 6a660a7482e..4a527040d80 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.connect.proto.Expression.SortOrder.NullOrdering
 import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.lit
@@ -1087,7 +1087,7 @@ class Column private[sql] (@DeveloperApi val expr: 
proto.Expression) extends Log
    * @group expr_ops
    * @since 3.4.0
    */
-  def cast(to: String): Column = cast(CatalystSqlParser.parseDataType(to))
+  def cast(to: String): Column = cast(DataTypeParser.parseDataType(to))
 
   /**
    * Returns a sort expression based on the descending order of the column.
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 40f9ac1df2b..10d2af094a0 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -25,9 +25,9 @@ import org.apache.spark.annotation.Stable
 import org.apache.spark.connect.proto.Parse.ParseFormat
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
SparkCharVarcharUtils}
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -58,7 +58,7 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
    */
   def schema(schema: StructType): DataFrameReader = {
     if (schema != null) {
-      val replaced = 
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
+      val replaced = 
SparkCharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
       this.userSpecifiedSchema = Option(replaced)
     }
     this
@@ -563,7 +563,7 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
    */
   private def assertNoSpecifiedSchema(operation: String): Unit = {
     if (userSpecifiedSchema.nonEmpty) {
-      throw 
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
+      throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
     }
   }
 
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 47e361c9679..b0dd91293a0 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.function._
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
-import org.apache.spark.sql.catalyst.expressions.RowOrdering
+import org.apache.spark.sql.catalyst.expressions.OrderUtils
 import org.apache.spark.sql.connect.client.SparkResult
 import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
StorageLevelProtoConverter, UdfUtils}
 import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
@@ -37,7 +37,7 @@ import org.apache.spark.sql.functions.{struct, to_json}
 import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types.{Metadata, StructType}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.SparkClassUtils
 
 /**
  * A Dataset is a strongly typed collection of domain-specific objects that 
can be transformed in
@@ -876,7 +876,7 @@ class Dataset[T] private[sql] (
 
     val tupleEncoder =
       ProductEncoder[(T, U)](
-        
ClassTag(Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")),
+        
ClassTag(SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")),
         Seq(
           EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty),
           EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty)))
@@ -2035,7 +2035,7 @@ class Dataset[T] private[sql] (
    * @since 3.4.0
    */
   def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = {
-    sample(withReplacement, fraction, Utils.random.nextLong)
+    sample(withReplacement, fraction, SparkClassUtils.random.nextLong)
   }
 
   /**
@@ -2069,7 +2069,7 @@ class Dataset[T] private[sql] (
     //  between construction and execution the query might fail or produce 
wrong results. Another
     //  problem can come from data that arrives between the execution of the 
returned datasets.
     val sortOrder = schema.collect {
-      case f if RowOrdering.isOrderable(f.dataType) => col(f.name).asc
+      case f if OrderUtils.isOrderable(f.dataType) => col(f.name).asc
     }
     val sortedInput = sortWithinPartitions(sortOrder: _*).plan.getRoot
     val sum = weights.sum
@@ -2114,7 +2114,7 @@ class Dataset[T] private[sql] (
    * @since 3.4.0
    */
   def randomSplit(weights: Array[Double]): Array[Dataset[T]] = {
-    randomSplit(weights, Utils.random.nextLong)
+    randomSplit(weights, SparkClassUtils.random.nextLong)
   }
 
   private def withColumns(names: Seq[String], values: Seq[Column]): DataFrame 
= {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5c9eea7a151..2d61b1b6305 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -92,7 +92,6 @@ private[spark] object CallSite {
  * Various utility methods used by Spark.
  */
 private[spark] object Utils extends Logging with SparkClassUtils {
-  val random = new Random()
 
   private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
   @volatile private var cachedLocalDir: String = ""
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 35425017ea5..729c41755de 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -174,6 +174,35 @@ object MimaExcludes {
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.types.ObjectType"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.types.ObjectType$"),
 
+    // SPARK-44496: Move Interfaces needed by SCSC to sql/api.
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoder"),
+    ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Row"),
+    ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Row$"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.package"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.package$"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.CoGroupFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.DoubleFlatMapFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.DoubleFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FilterFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapFunction2"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapGroupsFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ForeachFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ForeachPartitionFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function0"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function2"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function3"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function4"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapGroupsFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapPartitionsFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.PairFlatMapFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.PairFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ReduceFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.VoidFunction"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.VoidFunction2"),
+
     (problem: Problem) => problem match {
       case MissingClassProblem(cls) => 
!cls.fullName.startsWith("org.sparkproject.jpmml") &&
           !cls.fullName.startsWith("org.sparkproject.dmg.pmml")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/Encoder.scala
similarity index 100%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
rename to sql/api/src/main/scala/org/apache/spark/sql/Encoder.scala
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/Row.scala
similarity index 98%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
rename to sql/api/src/main/scala/org/apache/spark/sql/Row.scala
index d223da6503f..1765a5444df 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Row.scala
@@ -22,7 +22,7 @@ import scala.util.hashing.MurmurHash3
 
 import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.sql.types._
 
 /**
@@ -366,7 +366,7 @@ trait Row extends Serializable {
    * @throws IllegalArgumentException when a field `name` does not exist.
    */
   def fieldIndex(name: String): Int = {
-    throw QueryExecutionErrors.fieldIndexOnRowWithoutSchemaError()
+    throw DataTypeErrors.fieldIndexOnRowWithoutSchemaError()
   }
 
   /**
@@ -511,6 +511,6 @@ trait Row extends Serializable {
    * @throws NullPointerException when value is null.
    */
   private def getAnyValAs[T <: AnyVal](i: Int): T =
-    if (isNullAt(i)) throw QueryExecutionErrors.valueIsNullError(i)
+    if (isNullAt(i)) throw DataTypeErrors.valueIsNullError(i)
     else getAs[T](i)
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala
index c297a2e067a..48efa510666 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala
@@ -40,6 +40,7 @@ private[sql] trait SqlApiConf {
   def doubleQuotedIdentifiers: Boolean
   def timestampType: AtomicType
   def allowNegativeScaleOfDecimalEnabled: Boolean
+  def charVarcharAsString: Boolean
 }
 
 private[sql] object SqlApiConf {
@@ -74,4 +75,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
   override def doubleQuotedIdentifiers: Boolean = false
   override def timestampType: AtomicType = TimestampType
   override def allowNegativeScaleOfDecimalEnabled: Boolean = false
+  override def charVarcharAsString: Boolean = false
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
similarity index 98%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
index e2c6452b7df..99f33214d20 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
@@ -26,7 +26,7 @@ import scala.reflect.{classTag, ClassTag}
 import org.apache.spark.sql.{Encoder, Row}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
-import org.apache.spark.util.Utils
+import org.apache.spark.util.SparkClassUtils
 
 /**
  * A non implementation specific encoder. This encoder containers all the 
information needed
@@ -122,7 +122,7 @@ object AgnosticEncoders {
         case (e, id) => EncoderField(s"_${id + 1}", e, e.nullable, 
Metadata.empty)
       }
       val cls = cachedCls.computeIfAbsent(encoders.size,
-        _ => 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}"))
+        _ => 
SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}"))
       ProductEncoder[Any](ClassTag(cls), fields)
     }
 
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function3.java 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
similarity index 54%
rename from core/src/main/java/org/apache/spark/api/java/function/Function3.java
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
index 77acd21d4ef..f4b45f5928e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
@@ -14,15 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.spark.sql.catalyst.expressions
 
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
+import org.apache.spark.sql.Row
 
 /**
- * A three-argument function that takes arguments of type T1, T2 and T3 and 
returns an R.
+ * A row implementation that uses an array of objects as the underlying 
storage.  Note that, while
+ * the array is not copied, and thus could technically be mutated after 
creation, this is not
+ * allowed.
  */
-@FunctionalInterface
-public interface Function3<T1, T2, T3, R> extends Serializable {
-  R call(T1 v1, T2 v2, T3 v3) throws Exception;
+class GenericRow(protected[sql] val values: Array[Any]) extends Row {
+  /** No-arg constructor for serialization. */
+  protected def this() = this(null)
+
+  def this(size: Int) = this(new Array[Any](size))
+
+  override def length: Int = values.length
+
+  override def get(i: Int): Any = values(i)
+
+  override def toSeq: Seq[Any] = values.clone()
+
+  override def copy(): GenericRow = this
 }
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala
similarity index 56%
rename from 
core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala
index a6f69f7cdca..9319b104024 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala
@@ -14,17 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.spark.sql.catalyst.expressions
 
-package org.apache.spark.api.java.function;
+import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, NullType, 
StructType, UserDefinedType}
 
-import java.io.Serializable;
-
-/**
- * Base interface for a function used in Dataset's filter function.
- *
- * If the function returns true, the element is included in the returned 
Dataset.
- */
-@FunctionalInterface
-public interface FilterFunction<T> extends Serializable {
-  boolean call(T value) throws Exception;
+object OrderUtils {
+  /**
+   * Returns true iff the data type can be ordered (i.e. can be sorted).
+   */
+  def isOrderable(dataType: DataType): Boolean = dataType match {
+    case NullType => true
+    case dt: AtomicType => true
+    case struct: StructType => struct.fields.forall(f => 
isOrderable(f.dataType))
+    case array: ArrayType => isOrderable(array.elementType)
+    case udt: UserDefinedType[_] => isOrderable(udt.sqlType)
+    case _ => false
+  }
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala
new file mode 100644
index 00000000000..47b3ef9b05a
--- /dev/null
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.SqlApiConf
+import org.apache.spark.sql.errors.DataTypeErrors
+import org.apache.spark.sql.types.{ArrayType, CharType, DataType, MapType, 
StringType, StructType, VarcharType}
+
+trait SparkCharVarcharUtils {
+  /**
+   * Returns true if the given data type is CharType/VarcharType or has nested 
CharType/VarcharType.
+   */
+  def hasCharVarchar(dt: DataType): Boolean = {
+    dt.existsRecursively(f => f.isInstanceOf[CharType] || 
f.isInstanceOf[VarcharType])
+  }
+
+  /**
+   * Validate the given [[DataType]] to fail if it is char or varchar types or 
contains nested ones
+   */
+  def failIfHasCharVarchar(dt: DataType): DataType = {
+    if (!SqlApiConf.get.charVarcharAsString && hasCharVarchar(dt)) {
+      throw DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError()
+    } else {
+      replaceCharVarcharWithString(dt)
+    }
+  }
+
+  /**
+   * Replaces CharType/VarcharType with StringType recursively in the given 
data type.
+   */
+  def replaceCharVarcharWithString(dt: DataType): DataType = dt match {
+    case ArrayType(et, nullable) =>
+      ArrayType(replaceCharVarcharWithString(et), nullable)
+    case MapType(kt, vt, nullable) =>
+      MapType(replaceCharVarcharWithString(kt), 
replaceCharVarcharWithString(vt), nullable)
+    case StructType(fields) =>
+      StructType(fields.map { field =>
+        field.copy(dataType = replaceCharVarcharWithString(field.dataType))
+      })
+    case _: CharType => StringType
+    case _: VarcharType => StringType
+    case _ => dt
+  }
+}
+
+object SparkCharVarcharUtils extends SparkCharVarcharUtils
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
index 69156e11658..fcc3086b573 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
@@ -267,4 +267,32 @@ private[sql] object DataTypeErrors extends 
DataTypeErrorsBase {
       errorClass = "FAILED_PARSE_STRUCT_TYPE",
       messageParameters = Map("raw" -> s"'$raw'"))
   }
+
+  def fieldIndexOnRowWithoutSchemaError(): SparkUnsupportedOperationException 
= {
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_2231",
+      messageParameters = Map.empty)
+  }
+
+  def valueIsNullError(index: Int): Throwable = {
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2232",
+      messageParameters = Map(
+        "index" -> index.toString),
+      cause = null)
+  }
+
+  def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
+    new SparkException(
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
+      messageParameters = Map.empty,
+      cause = null)
+  }
+
+  def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = {
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_1189",
+      messageParameters = Map("operation" -> operation),
+      cause = null)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
index ab68eb266cb..41c68d439a2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -91,14 +91,7 @@ object RowOrdering extends 
CodeGeneratorWithInterpretedFallback[Seq[SortOrder],
   /**
    * Returns true iff the data type can be ordered (i.e. can be sorted).
    */
-  def isOrderable(dataType: DataType): Boolean = dataType match {
-    case NullType => true
-    case dt: AtomicType => true
-    case struct: StructType => struct.fields.forall(f => 
isOrderable(f.dataType))
-    case array: ArrayType => isOrderable(array.elementType)
-    case udt: UserDefinedType[_] => isOrderable(udt.sqlType)
-    case _ => false
-  }
+  def isOrderable(dataType: DataType): Boolean = 
OrderUtils.isOrderable(dataType)
 
   /**
    * Returns true iff outputs from the expressions can be ordered.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 65539a2f00e..10c33a43270 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.types._
@@ -158,26 +157,6 @@ trait BaseGenericInternalRow extends InternalRow {
   }
 }
 
-/**
- * A row implementation that uses an array of objects as the underlying 
storage.  Note that, while
- * the array is not copied, and thus could technically be mutated after 
creation, this is not
- * allowed.
- */
-class GenericRow(protected[sql] val values: Array[Any]) extends Row {
-  /** No-arg constructor for serialization. */
-  protected def this() = this(null)
-
-  def this(size: Int) = this(new Array[Any](size))
-
-  override def length: Int = values.length
-
-  override def get(i: Int): Any = values(i)
-
-  override def toSeq: Seq[Any] = values.clone()
-
-  override def copy(): GenericRow = this
-}
-
 class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
   extends GenericRow(values) {
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
index 44810634358..b9d83d44490 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
@@ -23,11 +23,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
-object CharVarcharUtils extends Logging {
+object CharVarcharUtils extends Logging with SparkCharVarcharUtils {
 
   private val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = 
"__CHAR_VARCHAR_TYPE_STRING"
 
@@ -49,41 +48,6 @@ object CharVarcharUtils extends Logging {
     })
   }
 
-  /**
-   * Returns true if the given data type is CharType/VarcharType or has nested 
CharType/VarcharType.
-   */
-  def hasCharVarchar(dt: DataType): Boolean = {
-    dt.existsRecursively(f => f.isInstanceOf[CharType] || 
f.isInstanceOf[VarcharType])
-  }
-
-  /**
-   * Validate the given [[DataType]] to fail if it is char or varchar types or 
contains nested ones
-   */
-  def failIfHasCharVarchar(dt: DataType): DataType = {
-    if (!SQLConf.get.charVarcharAsString && hasCharVarchar(dt)) {
-      throw QueryCompilationErrors.charOrVarcharTypeAsStringUnsupportedError()
-    } else {
-      replaceCharVarcharWithString(dt)
-    }
-  }
-
-  /**
-   * Replaces CharType/VarcharType with StringType recursively in the given 
data type.
-   */
-  def replaceCharVarcharWithString(dt: DataType): DataType = dt match {
-    case ArrayType(et, nullable) =>
-      ArrayType(replaceCharVarcharWithString(et), nullable)
-    case MapType(kt, vt, nullable) =>
-      MapType(replaceCharVarcharWithString(kt), 
replaceCharVarcharWithString(vt), nullable)
-    case StructType(fields) =>
-      StructType(fields.map { field =>
-        field.copy(dataType = replaceCharVarcharWithString(field.dataType))
-      })
-    case _: CharType => StringType
-    case _: VarcharType => StringType
-    case _ => dt
-  }
-
   /**
    * Replaces CharType with VarcharType recursively in the given data type.
    */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 6e809d5bbf0..8120eb9b57e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2008,9 +2008,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = {
-    new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1189",
-      messageParameters = Map("operation" -> operation))
+    DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
   }
 
   def tempViewNotSupportStreamingWriteError(viewName: String): Throwable = {
@@ -2380,9 +2378,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
-    new AnalysisException(
-      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
-      messageParameters = Map.empty)
+    DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError()
   }
 
   def escapeCharacterInTheMiddleError(pattern: String, char: String): 
Throwable = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 983648ff673..183c5425202 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2194,17 +2194,11 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
   }
 
   def fieldIndexOnRowWithoutSchemaError(): SparkUnsupportedOperationException 
= {
-    new SparkUnsupportedOperationException(
-      errorClass = "_LEGACY_ERROR_TEMP_2231",
-      messageParameters = Map.empty)
+    DataTypeErrors.fieldIndexOnRowWithoutSchemaError()
   }
 
   def valueIsNullError(index: Int): Throwable = {
-    new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2232",
-      messageParameters = Map(
-        "index" -> toSQLValue(index, IntegerType)),
-      cause = null)
+    DataTypeErrors.valueIsNullError(index)
   }
 
   def onlySupportDataSourcesProvidingFileFormatError(providingClass: String): 
Throwable = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 4a7632486c0..55009f2416c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -794,15 +794,11 @@ class BasicCharVarcharTestSuite extends QueryTest with 
SharedSparkSession {
 
   test("invalidate char/varchar in functions") {
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
-      parameters = Map.empty,
-      context = ExpectedContext(
-        fragment = "from_json('{\"a\": \"str\"}', 'a CHAR(5)')",
-        start = 7,
-        stop = 44)
+      parameters = Map.empty
     )
     withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
       val df = sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
@@ -816,19 +812,19 @@ class BasicCharVarcharTestSuite extends QueryTest with 
SharedSparkSession {
     val df = spark.range(10).map(_.toString).toDF()
     val schema = new StructType().add("id", CharType(5))
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.createDataFrame(df.collectAsList(), schema)
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
     )
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.createDataFrame(df.rdd, schema)
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
     )
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.createDataFrame(df.toJavaRDD, schema)
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -842,12 +838,12 @@ class BasicCharVarcharTestSuite extends QueryTest with 
SharedSparkSession {
 
   test("invalidate char/varchar in spark.read.schema") {
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.read.schema(new StructType().add("id", CharType(5)))
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING")
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.read.schema("id char(5)")
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -884,13 +880,13 @@ class BasicCharVarcharTestSuite extends QueryTest with 
SharedSparkSession {
 
   test("invalidate char/varchar in udf's result type") {
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.udf.register("testchar", () => "B", VarcharType(1))
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
     )
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.udf.register("testchar2", (x: String) => x, VarcharType(1))
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -909,13 +905,13 @@ class BasicCharVarcharTestSuite extends QueryTest with 
SharedSparkSession {
 
   test("invalidate char/varchar in spark.readStream.schema") {
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.readStream.schema(new StructType().add("id", CharType(5)))
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
     )
     checkError(
-      exception = intercept[AnalysisException] {
+      exception = intercept[SparkException] {
         spark.readStream.schema("id char(5)")
       },
       errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -938,7 +934,7 @@ class BasicCharVarcharTestSuite extends QueryTest with 
SharedSparkSession {
       sql("CREATE TABLE t(c char(10), v varchar(255)) USING parquet")
       sql("INSERT INTO t VALUES('spark', 'awesome')")
       val df = sql("SELECT * FROM t")
-      checkError(exception = intercept[AnalysisException] {
+      checkError(exception = intercept[SparkException] {
         df.to(newSchema)
       }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", parameters = 
Map.empty)
       withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 93b6652d516..94c344fc384 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1364,12 +1364,12 @@ class JDBCSuite extends QueryTest with 
SharedSparkSession {
     val schema = StructType(Seq(StructField("name", StringType, false, 
defaultMetadata),
       StructField("theid", IntegerType, false, defaultMetadata)))
     val parts = Array[String]("THEID < 2", "THEID >= 2")
-    val e1 = intercept[AnalysisException] {
+    val e1 = intercept[SparkException] {
       spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, 
new Properties())
     }.getMessage
     assert(e1.contains("User specified schema not supported with `jdbc`"))
 
-    val e2 = intercept[AnalysisException] {
+    val e2 = intercept[SparkException] {
       spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new 
Properties())
     }.getMessage
     assert(e2.contains("User specified schema not supported with `jdbc`"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 17348fe2dcb..967d384d979 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -32,7 +32,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.Type.Repetition
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkContext, TestUtils}
+import org.apache.spark.{SparkContext, SparkException, TestUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -633,12 +633,12 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
     testRead(Option(dir).map(spark.read.text).get, data, textSchema)
 
     // Reader, with user specified schema, should just apply user schema on 
the file data
-    val e = intercept[AnalysisException] { 
spark.read.schema(userSchema).textFile() }
+    val e = intercept[SparkException] { 
spark.read.schema(userSchema).textFile() }
     assert(e.getMessage.toLowerCase(Locale.ROOT).contains(
       "user specified schema not supported"))
-    intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir) 
}
-    intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir, 
dir) }
-    intercept[AnalysisException] { 
spark.read.schema(userSchema).textFile(Seq(dir, dir): _*) }
+    intercept[SparkException] { spark.read.schema(userSchema).textFile(dir) }
+    intercept[SparkException] { spark.read.schema(userSchema).textFile(dir, 
dir) }
+    intercept[SparkException] { 
spark.read.schema(userSchema).textFile(Seq(dir, dir): _*) }
   }
 
   test("csv - API and behavior regarding schema") {
@@ -969,7 +969,7 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
   test("SPARK-16848: table API throws an exception for user specified schema") 
{
     withTable("t") {
       val schema = StructType(StructField("a", StringType) :: Nil)
-      val e = intercept[AnalysisException] {
+      val e = intercept[SparkException] {
         spark.read.schema(schema).table("t")
       }.getMessage
       assert(e.contains("User specified schema not supported with `table`"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to