This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new b9d6b9a2658 [SPARK-44496][SQL][CONNECT] Move Interfaces needed by SCSC
to sql/api
b9d6b9a2658 is described below
commit b9d6b9a26589100db682bb6b6d66eb0fb49df85e
Author: Rui Wang <[email protected]>
AuthorDate: Sat Jul 22 19:53:04 2023 -0400
[SPARK-44496][SQL][CONNECT] Move Interfaces needed by SCSC to sql/api
### What changes were proposed in this pull request?
This PR moves some interfaces and utils that are needed by scala client to
sql/api.
### Why are the changes needed?
So that scala client does not need to depend on Spark.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #42092 from amaliujia/row_coder.
Authored-by: Rui Wang <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 515dfc166a95ecc8decce0f0cd99e06fe395f94f)
Signed-off-by: Herman van Hovell <[email protected]>
---
.../spark/api/java/function/CoGroupFunction.java | 0
.../api/java/function/DoubleFlatMapFunction.java | 0
.../spark/api/java/function/DoubleFunction.java | 0
.../spark/api/java/function/FilterFunction.java | 0
.../spark/api/java/function/FlatMapFunction.java | 0
.../spark/api/java/function/FlatMapFunction2.java | 0
.../api/java/function/FlatMapGroupsFunction.java | 0
.../spark/api/java/function/ForeachFunction.java | 0
.../java/function/ForeachPartitionFunction.java | 0
.../apache/spark/api/java/function/Function.java | 0
.../apache/spark/api/java/function/Function0.java | 0
.../apache/spark/api/java/function/Function2.java | 0
.../apache/spark/api/java/function/Function3.java | 0
.../apache/spark/api/java/function/Function4.java | 0
.../spark/api/java/function/MapFunction.java | 0
.../spark/api/java/function/MapGroupsFunction.java | 0
.../api/java/function/MapPartitionsFunction.java | 0
.../api/java/function/PairFlatMapFunction.java | 0
.../spark/api/java/function/PairFunction.java | 0
.../spark/api/java/function/ReduceFunction.java | 0
.../spark/api/java/function/VoidFunction.java | 0
.../spark/api/java/function/VoidFunction2.java | 0
.../spark/api/java/function/package-info.java | 0
.../apache/spark/api/java/function/package.scala | 0
.../org/apache/spark/util/SparkClassUtils.scala | 4 ++
connector/connect/client/jvm/pom.xml | 5 ++
.../main/scala/org/apache/spark/sql/Column.scala | 4 +-
.../org/apache/spark/sql/DataFrameReader.scala | 8 +--
.../main/scala/org/apache/spark/sql/Dataset.scala | 12 ++---
.../main/scala/org/apache/spark/util/Utils.scala | 1 -
project/MimaExcludes.scala | 29 +++++++++++
.../main/scala/org/apache/spark/sql/Encoder.scala | 0
.../src/main/scala/org/apache/spark/sql/Row.scala | 6 +--
.../scala/org/apache/spark/sql/SqlApiConf.scala | 2 +
.../sql/catalyst/encoders/AgnosticEncoder.scala | 4 +-
.../sql/catalyst/expressions/GenericRow.scala | 25 ++++++---
.../sql/catalyst/expressions/OrderUtils.scala | 25 +++++----
.../sql/catalyst/util/SparkCharVarcharUtils.scala | 60 ++++++++++++++++++++++
.../apache/spark/sql/errors/DataTypeErrors.scala | 28 ++++++++++
.../spark/sql/catalyst/expressions/ordering.scala | 9 +---
.../spark/sql/catalyst/expressions/rows.scala | 21 --------
.../spark/sql/catalyst/util/CharVarcharUtils.scala | 38 +-------------
.../spark/sql/errors/QueryCompilationErrors.scala | 8 +--
.../spark/sql/errors/QueryExecutionErrors.scala | 10 +---
.../apache/spark/sql/CharVarcharTestSuite.scala | 28 +++++-----
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 +-
.../sql/test/DataFrameReaderWriterSuite.scala | 12 ++---
47 files changed, 203 insertions(+), 140 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
similarity index 100%
copy from
core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
copy to
common/utils/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/Function.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/Function.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/Function0.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function0.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function0.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/Function0.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/Function2.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function2.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function2.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/Function2.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function3.java
similarity index 100%
copy from core/src/main/java/org/apache/spark/api/java/function/Function3.java
copy to
common/utils/src/main/java/org/apache/spark/api/java/function/Function3.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/Function4.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/Function4.java
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/Function4.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/Function4.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/MapFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/MapFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/PairFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/PairFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/package-info.java
b/common/utils/src/main/java/org/apache/spark/api/java/function/package-info.java
similarity index 100%
rename from
core/src/main/java/org/apache/spark/api/java/function/package-info.java
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/package-info.java
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/package.scala
b/common/utils/src/main/java/org/apache/spark/api/java/function/package.scala
similarity index 100%
rename from core/src/main/java/org/apache/spark/api/java/function/package.scala
rename to
common/utils/src/main/java/org/apache/spark/api/java/function/package.scala
diff --git
a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
index 7401e376241..a237869aef3 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
@@ -16,9 +16,13 @@
*/
package org.apache.spark.util
+import java.util.Random
+
import scala.util.Try
trait SparkClassUtils {
+ val random = new Random()
+
def getSparkClassLoader: ClassLoader = getClass.getClassLoader
def getContextOrSparkClassLoader: ClassLoader =
diff --git a/connector/connect/client/jvm/pom.xml
b/connector/connect/client/jvm/pom.xml
index 0f6783cbd68..f60e60890c0 100644
--- a/connector/connect/client/jvm/pom.xml
+++ b/connector/connect/client/jvm/pom.xml
@@ -64,6 +64,11 @@
<artifactId>spark-common-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sketch_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
index 6a660a7482e..4a527040d80 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.connect.proto.Expression.SortOrder.NullOrdering
import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
@@ -1087,7 +1087,7 @@ class Column private[sql] (@DeveloperApi val expr:
proto.Expression) extends Log
* @group expr_ops
* @since 3.4.0
*/
- def cast(to: String): Column = cast(CatalystSqlParser.parseDataType(to))
+ def cast(to: String): Column = cast(DataTypeParser.parseDataType(to))
/**
* Returns a sort expression based on the descending order of the column.
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 40f9ac1df2b..10d2af094a0 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -25,9 +25,9 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
SparkCharVarcharUtils}
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types.StructType
/**
@@ -58,7 +58,7 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
*/
def schema(schema: StructType): DataFrameReader = {
if (schema != null) {
- val replaced =
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
+ val replaced =
SparkCharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
this.userSpecifiedSchema = Option(replaced)
}
this
@@ -563,7 +563,7 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
*/
private def assertNoSpecifiedSchema(operation: String): Unit = {
if (userSpecifiedSchema.nonEmpty) {
- throw
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
+ throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
}
}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 47e361c9679..b0dd91293a0 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.function._
import org.apache.spark.connect.proto
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
-import org.apache.spark.sql.catalyst.expressions.RowOrdering
+import org.apache.spark.sql.catalyst.expressions.OrderUtils
import org.apache.spark.sql.connect.client.SparkResult
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter,
StorageLevelProtoConverter, UdfUtils}
import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
@@ -37,7 +37,7 @@ import org.apache.spark.sql.functions.{struct, to_json}
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types.{Metadata, StructType}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.SparkClassUtils
/**
* A Dataset is a strongly typed collection of domain-specific objects that
can be transformed in
@@ -876,7 +876,7 @@ class Dataset[T] private[sql] (
val tupleEncoder =
ProductEncoder[(T, U)](
-
ClassTag(Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")),
+
ClassTag(SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")),
Seq(
EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty),
EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty)))
@@ -2035,7 +2035,7 @@ class Dataset[T] private[sql] (
* @since 3.4.0
*/
def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = {
- sample(withReplacement, fraction, Utils.random.nextLong)
+ sample(withReplacement, fraction, SparkClassUtils.random.nextLong)
}
/**
@@ -2069,7 +2069,7 @@ class Dataset[T] private[sql] (
// between construction and execution the query might fail or produce
wrong results. Another
// problem can come from data that arrives between the execution of the
returned datasets.
val sortOrder = schema.collect {
- case f if RowOrdering.isOrderable(f.dataType) => col(f.name).asc
+ case f if OrderUtils.isOrderable(f.dataType) => col(f.name).asc
}
val sortedInput = sortWithinPartitions(sortOrder: _*).plan.getRoot
val sum = weights.sum
@@ -2114,7 +2114,7 @@ class Dataset[T] private[sql] (
* @since 3.4.0
*/
def randomSplit(weights: Array[Double]): Array[Dataset[T]] = {
- randomSplit(weights, Utils.random.nextLong)
+ randomSplit(weights, SparkClassUtils.random.nextLong)
}
private def withColumns(names: Seq[String], values: Seq[Column]): DataFrame
= {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5c9eea7a151..2d61b1b6305 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -92,7 +92,6 @@ private[spark] object CallSite {
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging with SparkClassUtils {
- val random = new Random()
private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
@volatile private var cachedLocalDir: String = ""
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index d727c8c6917..3b4f0796ec1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -170,6 +170,35 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.types.ObjectType"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.types.ObjectType$"),
+ // SPARK-44496: Move Interfaces needed by SCSC to sql/api.
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoder"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Row"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Row$"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.package"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.package$"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.CoGroupFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.DoubleFlatMapFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.DoubleFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FilterFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapFunction2"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapGroupsFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ForeachFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ForeachPartitionFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function0"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function2"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function3"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function4"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapGroupsFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapPartitionsFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.PairFlatMapFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.PairFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ReduceFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.VoidFunction"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.VoidFunction2"),
+
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
!cls.fullName.startsWith("org.sparkproject.jpmml") &&
!cls.fullName.startsWith("org.sparkproject.dmg.pmml")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
b/sql/api/src/main/scala/org/apache/spark/sql/Encoder.scala
similarity index 100%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
rename to sql/api/src/main/scala/org/apache/spark/sql/Encoder.scala
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
b/sql/api/src/main/scala/org/apache/spark/sql/Row.scala
similarity index 98%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
rename to sql/api/src/main/scala/org/apache/spark/sql/Row.scala
index d223da6503f..1765a5444df 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Row.scala
@@ -22,7 +22,7 @@ import scala.util.hashing.MurmurHash3
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types._
/**
@@ -366,7 +366,7 @@ trait Row extends Serializable {
* @throws IllegalArgumentException when a field `name` does not exist.
*/
def fieldIndex(name: String): Int = {
- throw QueryExecutionErrors.fieldIndexOnRowWithoutSchemaError()
+ throw DataTypeErrors.fieldIndexOnRowWithoutSchemaError()
}
/**
@@ -511,6 +511,6 @@ trait Row extends Serializable {
* @throws NullPointerException when value is null.
*/
private def getAnyValAs[T <: AnyVal](i: Int): T =
- if (isNullAt(i)) throw QueryExecutionErrors.valueIsNullError(i)
+ if (isNullAt(i)) throw DataTypeErrors.valueIsNullError(i)
else getAs[T](i)
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala
b/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala
index c297a2e067a..48efa510666 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala
@@ -40,6 +40,7 @@ private[sql] trait SqlApiConf {
def doubleQuotedIdentifiers: Boolean
def timestampType: AtomicType
def allowNegativeScaleOfDecimalEnabled: Boolean
+ def charVarcharAsString: Boolean
}
private[sql] object SqlApiConf {
@@ -74,4 +75,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
override def doubleQuotedIdentifiers: Boolean = false
override def timestampType: AtomicType = TimestampType
override def allowNegativeScaleOfDecimalEnabled: Boolean = false
+ override def charVarcharAsString: Boolean = false
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
similarity index 98%
rename from
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
rename to
sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
index e2c6452b7df..99f33214d20 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
@@ -26,7 +26,7 @@ import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
-import org.apache.spark.util.Utils
+import org.apache.spark.util.SparkClassUtils
/**
* A non implementation specific encoder. This encoder containers all the
information needed
@@ -122,7 +122,7 @@ object AgnosticEncoders {
case (e, id) => EncoderField(s"_${id + 1}", e, e.nullable,
Metadata.empty)
}
val cls = cachedCls.computeIfAbsent(encoders.size,
- _ =>
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}"))
+ _ =>
SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}"))
ProductEncoder[Any](ClassTag(cls), fields)
}
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
similarity index 54%
rename from core/src/main/java/org/apache/spark/api/java/function/Function3.java
rename to
sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
index 77acd21d4ef..f4b45f5928e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
@@ -14,15 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.spark.sql.catalyst.expressions
-package org.apache.spark.api.java.function;
-
-import java.io.Serializable;
+import org.apache.spark.sql.Row
/**
- * A three-argument function that takes arguments of type T1, T2 and T3 and
returns an R.
+ * A row implementation that uses an array of objects as the underlying
storage. Note that, while
+ * the array is not copied, and thus could technically be mutated after
creation, this is not
+ * allowed.
*/
-@FunctionalInterface
-public interface Function3<T1, T2, T3, R> extends Serializable {
- R call(T1 v1, T2 v2, T3 v3) throws Exception;
+class GenericRow(protected[sql] val values: Array[Any]) extends Row {
+ /** No-arg constructor for serialization. */
+ protected def this() = this(null)
+
+ def this(size: Int) = this(new Array[Any](size))
+
+ override def length: Int = values.length
+
+ override def get(i: Int): Any = values(i)
+
+ override def toSeq: Seq[Any] = values.clone()
+
+ override def copy(): GenericRow = this
}
diff --git
a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala
similarity index 56%
rename from
core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
rename to
sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala
index a6f69f7cdca..9319b104024 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala
@@ -14,17 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.spark.sql.catalyst.expressions
-package org.apache.spark.api.java.function;
+import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, NullType,
StructType, UserDefinedType}
-import java.io.Serializable;
-
-/**
- * Base interface for a function used in Dataset's filter function.
- *
- * If the function returns true, the element is included in the returned
Dataset.
- */
-@FunctionalInterface
-public interface FilterFunction<T> extends Serializable {
- boolean call(T value) throws Exception;
+object OrderUtils {
+ /**
+ * Returns true iff the data type can be ordered (i.e. can be sorted).
+ */
+ def isOrderable(dataType: DataType): Boolean = dataType match {
+ case NullType => true
+ case dt: AtomicType => true
+ case struct: StructType => struct.fields.forall(f =>
isOrderable(f.dataType))
+ case array: ArrayType => isOrderable(array.elementType)
+ case udt: UserDefinedType[_] => isOrderable(udt.sqlType)
+ case _ => false
+ }
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala
new file mode 100644
index 00000000000..47b3ef9b05a
--- /dev/null
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.SqlApiConf
+import org.apache.spark.sql.errors.DataTypeErrors
+import org.apache.spark.sql.types.{ArrayType, CharType, DataType, MapType,
StringType, StructType, VarcharType}
+
+trait SparkCharVarcharUtils {
+ /**
+ * Returns true if the given data type is CharType/VarcharType or has nested
CharType/VarcharType.
+ */
+ def hasCharVarchar(dt: DataType): Boolean = {
+ dt.existsRecursively(f => f.isInstanceOf[CharType] ||
f.isInstanceOf[VarcharType])
+ }
+
+ /**
+ * Validate the given [[DataType]] to fail if it is char or varchar types or
contains nested ones
+ */
+ def failIfHasCharVarchar(dt: DataType): DataType = {
+ if (!SqlApiConf.get.charVarcharAsString && hasCharVarchar(dt)) {
+ throw DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError()
+ } else {
+ replaceCharVarcharWithString(dt)
+ }
+ }
+
+ /**
+ * Replaces CharType/VarcharType with StringType recursively in the given
data type.
+ */
+ def replaceCharVarcharWithString(dt: DataType): DataType = dt match {
+ case ArrayType(et, nullable) =>
+ ArrayType(replaceCharVarcharWithString(et), nullable)
+ case MapType(kt, vt, nullable) =>
+ MapType(replaceCharVarcharWithString(kt),
replaceCharVarcharWithString(vt), nullable)
+ case StructType(fields) =>
+ StructType(fields.map { field =>
+ field.copy(dataType = replaceCharVarcharWithString(field.dataType))
+ })
+ case _: CharType => StringType
+ case _: VarcharType => StringType
+ case _ => dt
+ }
+}
+
+object SparkCharVarcharUtils extends SparkCharVarcharUtils
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
index 69156e11658..fcc3086b573 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
@@ -267,4 +267,32 @@ private[sql] object DataTypeErrors extends
DataTypeErrorsBase {
errorClass = "FAILED_PARSE_STRUCT_TYPE",
messageParameters = Map("raw" -> s"'$raw'"))
}
+
+ def fieldIndexOnRowWithoutSchemaError(): SparkUnsupportedOperationException
= {
+ new SparkUnsupportedOperationException(
+ errorClass = "_LEGACY_ERROR_TEMP_2231",
+ messageParameters = Map.empty)
+ }
+
+ def valueIsNullError(index: Int): Throwable = {
+ new SparkException(
+ errorClass = "_LEGACY_ERROR_TEMP_2232",
+ messageParameters = Map(
+ "index" -> index.toString),
+ cause = null)
+ }
+
+ def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
+ new SparkException(
+ errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
+ messageParameters = Map.empty,
+ cause = null)
+ }
+
+ def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = {
+ new SparkException(
+ errorClass = "_LEGACY_ERROR_TEMP_1189",
+ messageParameters = Map("operation" -> operation),
+ cause = null)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
index ab68eb266cb..41c68d439a2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -91,14 +91,7 @@ object RowOrdering extends
CodeGeneratorWithInterpretedFallback[Seq[SortOrder],
/**
* Returns true iff the data type can be ordered (i.e. can be sorted).
*/
- def isOrderable(dataType: DataType): Boolean = dataType match {
- case NullType => true
- case dt: AtomicType => true
- case struct: StructType => struct.fields.forall(f =>
isOrderable(f.dataType))
- case array: ArrayType => isOrderable(array.elementType)
- case udt: UserDefinedType[_] => isOrderable(udt.sqlType)
- case _ => false
- }
+ def isOrderable(dataType: DataType): Boolean =
OrderUtils.isOrderable(dataType)
/**
* Returns true iff outputs from the expressions can be ordered.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 65539a2f00e..10c33a43270 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
@@ -158,26 +157,6 @@ trait BaseGenericInternalRow extends InternalRow {
}
}
-/**
- * A row implementation that uses an array of objects as the underlying
storage. Note that, while
- * the array is not copied, and thus could technically be mutated after
creation, this is not
- * allowed.
- */
-class GenericRow(protected[sql] val values: Array[Any]) extends Row {
- /** No-arg constructor for serialization. */
- protected def this() = this(null)
-
- def this(size: Int) = this(new Array[Any](size))
-
- override def length: Int = values.length
-
- override def get(i: Int): Any = values(i)
-
- override def toSeq: Seq[Any] = values.clone()
-
- override def copy(): GenericRow = this
-}
-
class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
extends GenericRow(values) {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
index 44810634358..b9d83d44490 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
@@ -23,11 +23,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-object CharVarcharUtils extends Logging {
+object CharVarcharUtils extends Logging with SparkCharVarcharUtils {
private val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY =
"__CHAR_VARCHAR_TYPE_STRING"
@@ -49,41 +48,6 @@ object CharVarcharUtils extends Logging {
})
}
- /**
- * Returns true if the given data type is CharType/VarcharType or has nested
CharType/VarcharType.
- */
- def hasCharVarchar(dt: DataType): Boolean = {
- dt.existsRecursively(f => f.isInstanceOf[CharType] ||
f.isInstanceOf[VarcharType])
- }
-
- /**
- * Validate the given [[DataType]] to fail if it is char or varchar types or
contains nested ones
- */
- def failIfHasCharVarchar(dt: DataType): DataType = {
- if (!SQLConf.get.charVarcharAsString && hasCharVarchar(dt)) {
- throw QueryCompilationErrors.charOrVarcharTypeAsStringUnsupportedError()
- } else {
- replaceCharVarcharWithString(dt)
- }
- }
-
- /**
- * Replaces CharType/VarcharType with StringType recursively in the given
data type.
- */
- def replaceCharVarcharWithString(dt: DataType): DataType = dt match {
- case ArrayType(et, nullable) =>
- ArrayType(replaceCharVarcharWithString(et), nullable)
- case MapType(kt, vt, nullable) =>
- MapType(replaceCharVarcharWithString(kt),
replaceCharVarcharWithString(vt), nullable)
- case StructType(fields) =>
- StructType(fields.map { field =>
- field.copy(dataType = replaceCharVarcharWithString(field.dataType))
- })
- case _: CharType => StringType
- case _: VarcharType => StringType
- case _ => dt
- }
-
/**
* Replaces CharType with VarcharType recursively in the given data type.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 6fd20b7c34a..957c8cb334b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2001,9 +2001,7 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase {
}
def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = {
- new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1189",
- messageParameters = Map("operation" -> operation))
+ DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
}
def tempViewNotSupportStreamingWriteError(viewName: String): Throwable = {
@@ -2365,9 +2363,7 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase {
}
def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
- new AnalysisException(
- errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
- messageParameters = Map.empty)
+ DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError()
}
def escapeCharacterInTheMiddleError(pattern: String, char: String):
Throwable = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 433d23eaf3f..7551e58c942 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2194,17 +2194,11 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
}
def fieldIndexOnRowWithoutSchemaError(): SparkUnsupportedOperationException
= {
- new SparkUnsupportedOperationException(
- errorClass = "_LEGACY_ERROR_TEMP_2231",
- messageParameters = Map.empty)
+ DataTypeErrors.fieldIndexOnRowWithoutSchemaError()
}
def valueIsNullError(index: Int): Throwable = {
- new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2232",
- messageParameters = Map(
- "index" -> toSQLValue(index, IntegerType)),
- cause = null)
+ DataTypeErrors.valueIsNullError(index)
}
def onlySupportDataSourcesProvidingFileFormatError(providingClass: String):
Throwable = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 4a7632486c0..55009f2416c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -794,15 +794,11 @@ class BasicCharVarcharTestSuite extends QueryTest with
SharedSparkSession {
test("invalidate char/varchar in functions") {
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
- parameters = Map.empty,
- context = ExpectedContext(
- fragment = "from_json('{\"a\": \"str\"}', 'a CHAR(5)')",
- start = 7,
- stop = 44)
+ parameters = Map.empty
)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
val df = sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
@@ -816,19 +812,19 @@ class BasicCharVarcharTestSuite extends QueryTest with
SharedSparkSession {
val df = spark.range(10).map(_.toString).toDF()
val schema = new StructType().add("id", CharType(5))
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.createDataFrame(df.collectAsList(), schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.createDataFrame(df.rdd, schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.createDataFrame(df.toJavaRDD, schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -842,12 +838,12 @@ class BasicCharVarcharTestSuite extends QueryTest with
SharedSparkSession {
test("invalidate char/varchar in spark.read.schema") {
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.read.schema(new StructType().add("id", CharType(5)))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING")
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.read.schema("id char(5)")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -884,13 +880,13 @@ class BasicCharVarcharTestSuite extends QueryTest with
SharedSparkSession {
test("invalidate char/varchar in udf's result type") {
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.udf.register("testchar", () => "B", VarcharType(1))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.udf.register("testchar2", (x: String) => x, VarcharType(1))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -909,13 +905,13 @@ class BasicCharVarcharTestSuite extends QueryTest with
SharedSparkSession {
test("invalidate char/varchar in spark.readStream.schema") {
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.readStream.schema(new StructType().add("id", CharType(5)))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
- exception = intercept[AnalysisException] {
+ exception = intercept[SparkException] {
spark.readStream.schema("id char(5)")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
@@ -938,7 +934,7 @@ class BasicCharVarcharTestSuite extends QueryTest with
SharedSparkSession {
sql("CREATE TABLE t(c char(10), v varchar(255)) USING parquet")
sql("INSERT INTO t VALUES('spark', 'awesome')")
val df = sql("SELECT * FROM t")
- checkError(exception = intercept[AnalysisException] {
+ checkError(exception = intercept[SparkException] {
df.to(newSchema)
}, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", parameters =
Map.empty)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 93b6652d516..94c344fc384 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1364,12 +1364,12 @@ class JDBCSuite extends QueryTest with
SharedSparkSession {
val schema = StructType(Seq(StructField("name", StringType, false,
defaultMetadata),
StructField("theid", IntegerType, false, defaultMetadata)))
val parts = Array[String]("THEID < 2", "THEID >= 2")
- val e1 = intercept[AnalysisException] {
+ val e1 = intercept[SparkException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts,
new Properties())
}.getMessage
assert(e1.contains("User specified schema not supported with `jdbc`"))
- val e2 = intercept[AnalysisException] {
+ val e2 = intercept[SparkException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new
Properties())
}.getMessage
assert(e2.contains("User specified schema not supported with `jdbc`"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 17348fe2dcb..967d384d979 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -32,7 +32,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkContext, TestUtils}
+import org.apache.spark.{SparkContext, SparkException, TestUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -633,12 +633,12 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSparkSession with
testRead(Option(dir).map(spark.read.text).get, data, textSchema)
// Reader, with user specified schema, should just apply user schema on
the file data
- val e = intercept[AnalysisException] {
spark.read.schema(userSchema).textFile() }
+ val e = intercept[SparkException] {
spark.read.schema(userSchema).textFile() }
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(
"user specified schema not supported"))
- intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir)
}
- intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir,
dir) }
- intercept[AnalysisException] {
spark.read.schema(userSchema).textFile(Seq(dir, dir): _*) }
+ intercept[SparkException] { spark.read.schema(userSchema).textFile(dir) }
+ intercept[SparkException] { spark.read.schema(userSchema).textFile(dir,
dir) }
+ intercept[SparkException] {
spark.read.schema(userSchema).textFile(Seq(dir, dir): _*) }
}
test("csv - API and behavior regarding schema") {
@@ -969,7 +969,7 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSparkSession with
test("SPARK-16848: table API throws an exception for user specified schema")
{
withTable("t") {
val schema = StructType(StructField("a", StringType) :: Nil)
- val e = intercept[AnalysisException] {
+ val e = intercept[SparkException] {
spark.read.schema(schema).table("t")
}.getMessage
assert(e.contains("User specified schema not supported with `table`"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]