This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 515dfc166a9 [SPARK-44496][SQL][CONNECT] Move Interfaces needed by SCSC to sql/api 515dfc166a9 is described below commit 515dfc166a95ecc8decce0f0cd99e06fe395f94f Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Sat Jul 22 19:53:04 2023 -0400 [SPARK-44496][SQL][CONNECT] Move Interfaces needed by SCSC to sql/api ### What changes were proposed in this pull request? This PR moves some interfaces and utils that are needed by scala client to sql/api. ### Why are the changes needed? So that scala client does not need to depend on Spark. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #42092 from amaliujia/row_coder. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../spark/api/java/function/CoGroupFunction.java | 0 .../api/java/function/DoubleFlatMapFunction.java | 0 .../spark/api/java/function/DoubleFunction.java | 0 .../spark/api/java/function/FilterFunction.java | 0 .../spark/api/java/function/FlatMapFunction.java | 0 .../spark/api/java/function/FlatMapFunction2.java | 0 .../api/java/function/FlatMapGroupsFunction.java | 0 .../spark/api/java/function/ForeachFunction.java | 0 .../java/function/ForeachPartitionFunction.java | 0 .../apache/spark/api/java/function/Function.java | 0 .../apache/spark/api/java/function/Function0.java | 0 .../apache/spark/api/java/function/Function2.java | 0 .../apache/spark/api/java/function/Function3.java | 0 .../apache/spark/api/java/function/Function4.java | 0 .../spark/api/java/function/MapFunction.java | 0 .../spark/api/java/function/MapGroupsFunction.java | 0 .../api/java/function/MapPartitionsFunction.java | 0 .../api/java/function/PairFlatMapFunction.java | 0 .../spark/api/java/function/PairFunction.java | 0 .../spark/api/java/function/ReduceFunction.java | 0 .../spark/api/java/function/VoidFunction.java | 0 .../spark/api/java/function/VoidFunction2.java | 0 .../spark/api/java/function/package-info.java | 0 .../apache/spark/api/java/function/package.scala | 0 .../org/apache/spark/util/SparkClassUtils.scala | 4 ++ connector/connect/client/jvm/pom.xml | 5 ++ .../main/scala/org/apache/spark/sql/Column.scala | 4 +- .../org/apache/spark/sql/DataFrameReader.scala | 8 +-- .../main/scala/org/apache/spark/sql/Dataset.scala | 12 ++--- .../main/scala/org/apache/spark/util/Utils.scala | 1 - project/MimaExcludes.scala | 29 +++++++++++ .../main/scala/org/apache/spark/sql/Encoder.scala | 0 .../src/main/scala/org/apache/spark/sql/Row.scala | 6 +-- .../scala/org/apache/spark/sql/SqlApiConf.scala | 2 + .../sql/catalyst/encoders/AgnosticEncoder.scala | 4 +- .../sql/catalyst/expressions/GenericRow.scala | 25 ++++++--- .../sql/catalyst/expressions/OrderUtils.scala | 25 +++++---- .../sql/catalyst/util/SparkCharVarcharUtils.scala | 60 ++++++++++++++++++++++ .../apache/spark/sql/errors/DataTypeErrors.scala | 28 ++++++++++ .../spark/sql/catalyst/expressions/ordering.scala | 9 +--- .../spark/sql/catalyst/expressions/rows.scala | 21 -------- .../spark/sql/catalyst/util/CharVarcharUtils.scala | 38 +------------- .../spark/sql/errors/QueryCompilationErrors.scala | 8 +-- .../spark/sql/errors/QueryExecutionErrors.scala | 10 +--- .../apache/spark/sql/CharVarcharTestSuite.scala | 28 +++++----- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 +- .../sql/test/DataFrameReaderWriterSuite.scala | 12 ++--- 47 files changed, 203 insertions(+), 140 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/FilterFunction.java similarity index 100% copy from core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java copy to common/utils/src/main/java/org/apache/spark/api/java/function/FilterFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/common/utils/src/main/java/org/apache/spark/api/java/function/Function.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/Function.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/Function.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/common/utils/src/main/java/org/apache/spark/api/java/function/Function0.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/Function0.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/Function0.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/common/utils/src/main/java/org/apache/spark/api/java/function/Function2.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/Function2.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/Function2.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/common/utils/src/main/java/org/apache/spark/api/java/function/Function3.java similarity index 100% copy from core/src/main/java/org/apache/spark/api/java/function/Function3.java copy to common/utils/src/main/java/org/apache/spark/api/java/function/Function3.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/common/utils/src/main/java/org/apache/spark/api/java/function/Function4.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/Function4.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/Function4.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/MapFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/MapFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/MapFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/PairFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/PairFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/PairFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/package-info.java b/common/utils/src/main/java/org/apache/spark/api/java/function/package-info.java similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/package-info.java rename to common/utils/src/main/java/org/apache/spark/api/java/function/package-info.java diff --git a/core/src/main/java/org/apache/spark/api/java/function/package.scala b/common/utils/src/main/java/org/apache/spark/api/java/function/package.scala similarity index 100% rename from core/src/main/java/org/apache/spark/api/java/function/package.scala rename to common/utils/src/main/java/org/apache/spark/api/java/function/package.scala diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala index 7401e376241..a237869aef3 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala @@ -16,9 +16,13 @@ */ package org.apache.spark.util +import java.util.Random + import scala.util.Try trait SparkClassUtils { + val random = new Random() + def getSparkClassLoader: ClassLoader = getClass.getClassLoader def getContextOrSparkClassLoader: ClassLoader = diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml index 60ed0f3ba46..6605496a165 100644 --- a/connector/connect/client/jvm/pom.xml +++ b/connector/connect/client/jvm/pom.xml @@ -64,6 +64,11 @@ <artifactId>spark-common-utils_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sketch_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala index 6a660a7482e..4a527040d80 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala @@ -24,7 +24,7 @@ import org.apache.spark.connect.proto.Expression.SortOrder.NullOrdering import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.connect.common.DataTypeProtoConverter import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.lit @@ -1087,7 +1087,7 @@ class Column private[sql] (@DeveloperApi val expr: proto.Expression) extends Log * @group expr_ops * @since 3.4.0 */ - def cast(to: String): Column = cast(CatalystSqlParser.parseDataType(to)) + def cast(to: String): Column = cast(DataTypeParser.parseDataType(to)) /** * Returns a sort expression based on the descending order of the column. diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 40f9ac1df2b..10d2af094a0 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -25,9 +25,9 @@ import org.apache.spark.annotation.Stable import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, SparkCharVarcharUtils} import org.apache.spark.sql.connect.common.DataTypeProtoConverter -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.errors.DataTypeErrors import org.apache.spark.sql.types.StructType /** @@ -58,7 +58,7 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging */ def schema(schema: StructType): DataFrameReader = { if (schema != null) { - val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] + val replaced = SparkCharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] this.userSpecifiedSchema = Option(replaced) } this @@ -563,7 +563,7 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging */ private def assertNoSpecifiedSchema(operation: String): Unit = { if (userSpecifiedSchema.nonEmpty) { - throw QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation) + throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation) } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 47e361c9679..b0dd91293a0 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -29,7 +29,7 @@ import org.apache.spark.api.java.function._ import org.apache.spark.connect.proto import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ -import org.apache.spark.sql.catalyst.expressions.RowOrdering +import org.apache.spark.sql.catalyst.expressions.OrderUtils import org.apache.spark.sql.connect.client.SparkResult import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter, UdfUtils} import org.apache.spark.sql.expressions.ScalarUserDefinedFunction @@ -37,7 +37,7 @@ import org.apache.spark.sql.functions.{struct, to_json} import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types.{Metadata, StructType} import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils +import org.apache.spark.util.SparkClassUtils /** * A Dataset is a strongly typed collection of domain-specific objects that can be transformed in @@ -876,7 +876,7 @@ class Dataset[T] private[sql] ( val tupleEncoder = ProductEncoder[(T, U)]( - ClassTag(Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")), + ClassTag(SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")), Seq( EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty), EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty))) @@ -2035,7 +2035,7 @@ class Dataset[T] private[sql] ( * @since 3.4.0 */ def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = { - sample(withReplacement, fraction, Utils.random.nextLong) + sample(withReplacement, fraction, SparkClassUtils.random.nextLong) } /** @@ -2069,7 +2069,7 @@ class Dataset[T] private[sql] ( // between construction and execution the query might fail or produce wrong results. Another // problem can come from data that arrives between the execution of the returned datasets. val sortOrder = schema.collect { - case f if RowOrdering.isOrderable(f.dataType) => col(f.name).asc + case f if OrderUtils.isOrderable(f.dataType) => col(f.name).asc } val sortedInput = sortWithinPartitions(sortOrder: _*).plan.getRoot val sum = weights.sum @@ -2114,7 +2114,7 @@ class Dataset[T] private[sql] ( * @since 3.4.0 */ def randomSplit(weights: Array[Double]): Array[Dataset[T]] = { - randomSplit(weights, Utils.random.nextLong) + randomSplit(weights, SparkClassUtils.random.nextLong) } private def withColumns(names: Seq[String], values: Seq[Column]): DataFrame = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5c9eea7a151..2d61b1b6305 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -92,7 +92,6 @@ private[spark] object CallSite { * Various utility methods used by Spark. */ private[spark] object Utils extends Logging with SparkClassUtils { - val random = new Random() private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler @volatile private var cachedLocalDir: String = "" diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 35425017ea5..729c41755de 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -174,6 +174,35 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.types.ObjectType"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.types.ObjectType$"), + // SPARK-44496: Move Interfaces needed by SCSC to sql/api. + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoder"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Row"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Row$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.package"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.package$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.CoGroupFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.DoubleFlatMapFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.DoubleFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FilterFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapFunction2"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapGroupsFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ForeachFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ForeachPartitionFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function0"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function2"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function3"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.Function4"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapGroupsFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapPartitionsFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.PairFlatMapFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.PairFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.ReduceFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.VoidFunction"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.VoidFunction2"), + (problem: Problem) => problem match { case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") && !cls.fullName.startsWith("org.sparkproject.dmg.pmml") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/Encoder.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala rename to sql/api/src/main/scala/org/apache/spark/sql/Encoder.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/api/src/main/scala/org/apache/spark/sql/Row.scala similarity index 98% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala rename to sql/api/src/main/scala/org/apache/spark/sql/Row.scala index d223da6503f..1765a5444df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/Row.scala @@ -22,7 +22,7 @@ import scala.util.hashing.MurmurHash3 import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.DataTypeErrors import org.apache.spark.sql.types._ /** @@ -366,7 +366,7 @@ trait Row extends Serializable { * @throws IllegalArgumentException when a field `name` does not exist. */ def fieldIndex(name: String): Int = { - throw QueryExecutionErrors.fieldIndexOnRowWithoutSchemaError() + throw DataTypeErrors.fieldIndexOnRowWithoutSchemaError() } /** @@ -511,6 +511,6 @@ trait Row extends Serializable { * @throws NullPointerException when value is null. */ private def getAnyValAs[T <: AnyVal](i: Int): T = - if (isNullAt(i)) throw QueryExecutionErrors.valueIsNullError(i) + if (isNullAt(i)) throw DataTypeErrors.valueIsNullError(i) else getAs[T](i) } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala b/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala index c297a2e067a..48efa510666 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/SqlApiConf.scala @@ -40,6 +40,7 @@ private[sql] trait SqlApiConf { def doubleQuotedIdentifiers: Boolean def timestampType: AtomicType def allowNegativeScaleOfDecimalEnabled: Boolean + def charVarcharAsString: Boolean } private[sql] object SqlApiConf { @@ -74,4 +75,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf { override def doubleQuotedIdentifiers: Boolean = false override def timestampType: AtomicType = TimestampType override def allowNegativeScaleOfDecimalEnabled: Boolean = false + override def charVarcharAsString: Boolean = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala similarity index 98% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala rename to sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala index e2c6452b7df..99f33214d20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala @@ -26,7 +26,7 @@ import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.{Encoder, Row} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval -import org.apache.spark.util.Utils +import org.apache.spark.util.SparkClassUtils /** * A non implementation specific encoder. This encoder containers all the information needed @@ -122,7 +122,7 @@ object AgnosticEncoders { case (e, id) => EncoderField(s"_${id + 1}", e, e.nullable, Metadata.empty) } val cls = cachedCls.computeIfAbsent(encoders.size, - _ => Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")) + _ => SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")) ProductEncoder[Any](ClassTag(cls), fields) } diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala similarity index 54% rename from core/src/main/java/org/apache/spark/api/java/function/Function3.java rename to sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala index 77acd21d4ef..f4b45f5928e 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala @@ -14,15 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.spark.sql.catalyst.expressions -package org.apache.spark.api.java.function; - -import java.io.Serializable; +import org.apache.spark.sql.Row /** - * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R. + * A row implementation that uses an array of objects as the underlying storage. Note that, while + * the array is not copied, and thus could technically be mutated after creation, this is not + * allowed. */ -@FunctionalInterface -public interface Function3<T1, T2, T3, R> extends Serializable { - R call(T1 v1, T2 v2, T3 v3) throws Exception; +class GenericRow(protected[sql] val values: Array[Any]) extends Row { + /** No-arg constructor for serialization. */ + protected def this() = this(null) + + def this(size: Int) = this(new Array[Any](size)) + + override def length: Int = values.length + + override def get(i: Int): Any = values(i) + + override def toSeq: Seq[Any] = values.clone() + + override def copy(): GenericRow = this } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala similarity index 56% rename from core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java rename to sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala index a6f69f7cdca..9319b104024 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala @@ -14,17 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.spark.sql.catalyst.expressions -package org.apache.spark.api.java.function; +import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, NullType, StructType, UserDefinedType} -import java.io.Serializable; - -/** - * Base interface for a function used in Dataset's filter function. - * - * If the function returns true, the element is included in the returned Dataset. - */ -@FunctionalInterface -public interface FilterFunction<T> extends Serializable { - boolean call(T value) throws Exception; +object OrderUtils { + /** + * Returns true iff the data type can be ordered (i.e. can be sorted). + */ + def isOrderable(dataType: DataType): Boolean = dataType match { + case NullType => true + case dt: AtomicType => true + case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType)) + case array: ArrayType => isOrderable(array.elementType) + case udt: UserDefinedType[_] => isOrderable(udt.sqlType) + case _ => false + } } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala new file mode 100644 index 00000000000..47b3ef9b05a --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.SqlApiConf +import org.apache.spark.sql.errors.DataTypeErrors +import org.apache.spark.sql.types.{ArrayType, CharType, DataType, MapType, StringType, StructType, VarcharType} + +trait SparkCharVarcharUtils { + /** + * Returns true if the given data type is CharType/VarcharType or has nested CharType/VarcharType. + */ + def hasCharVarchar(dt: DataType): Boolean = { + dt.existsRecursively(f => f.isInstanceOf[CharType] || f.isInstanceOf[VarcharType]) + } + + /** + * Validate the given [[DataType]] to fail if it is char or varchar types or contains nested ones + */ + def failIfHasCharVarchar(dt: DataType): DataType = { + if (!SqlApiConf.get.charVarcharAsString && hasCharVarchar(dt)) { + throw DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError() + } else { + replaceCharVarcharWithString(dt) + } + } + + /** + * Replaces CharType/VarcharType with StringType recursively in the given data type. + */ + def replaceCharVarcharWithString(dt: DataType): DataType = dt match { + case ArrayType(et, nullable) => + ArrayType(replaceCharVarcharWithString(et), nullable) + case MapType(kt, vt, nullable) => + MapType(replaceCharVarcharWithString(kt), replaceCharVarcharWithString(vt), nullable) + case StructType(fields) => + StructType(fields.map { field => + field.copy(dataType = replaceCharVarcharWithString(field.dataType)) + }) + case _: CharType => StringType + case _: VarcharType => StringType + case _ => dt + } +} + +object SparkCharVarcharUtils extends SparkCharVarcharUtils diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala index 69156e11658..fcc3086b573 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala @@ -267,4 +267,32 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase { errorClass = "FAILED_PARSE_STRUCT_TYPE", messageParameters = Map("raw" -> s"'$raw'")) } + + def fieldIndexOnRowWithoutSchemaError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2231", + messageParameters = Map.empty) + } + + def valueIsNullError(index: Int): Throwable = { + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2232", + messageParameters = Map( + "index" -> index.toString), + cause = null) + } + + def charOrVarcharTypeAsStringUnsupportedError(): Throwable = { + new SparkException( + errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", + messageParameters = Map.empty, + cause = null) + } + + def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = { + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_1189", + messageParameters = Map("operation" -> operation), + cause = null) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala index ab68eb266cb..41c68d439a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala @@ -91,14 +91,7 @@ object RowOrdering extends CodeGeneratorWithInterpretedFallback[Seq[SortOrder], /** * Returns true iff the data type can be ordered (i.e. can be sorted). */ - def isOrderable(dataType: DataType): Boolean = dataType match { - case NullType => true - case dt: AtomicType => true - case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType)) - case array: ArrayType => isOrderable(array.elementType) - case udt: UserDefinedType[_] => isOrderable(udt.sqlType) - case _ => false - } + def isOrderable(dataType: DataType): Boolean = OrderUtils.isOrderable(dataType) /** * Returns true iff outputs from the expressions can be ordered. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 65539a2f00e..10c33a43270 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ @@ -158,26 +157,6 @@ trait BaseGenericInternalRow extends InternalRow { } } -/** - * A row implementation that uses an array of objects as the underlying storage. Note that, while - * the array is not copied, and thus could technically be mutated after creation, this is not - * allowed. - */ -class GenericRow(protected[sql] val values: Array[Any]) extends Row { - /** No-arg constructor for serialization. */ - protected def this() = this(null) - - def this(size: Int) = this(new Array[Any](size)) - - override def length: Int = values.length - - override def get(i: Int): Any = values(i) - - override def toSeq: Seq[Any] = values.clone() - - override def copy(): GenericRow = this -} - class GenericRowWithSchema(values: Array[Any], override val schema: StructType) extends GenericRow(values) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala index 44810634358..b9d83d44490 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala @@ -23,11 +23,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -object CharVarcharUtils extends Logging { +object CharVarcharUtils extends Logging with SparkCharVarcharUtils { private val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING" @@ -49,41 +48,6 @@ object CharVarcharUtils extends Logging { }) } - /** - * Returns true if the given data type is CharType/VarcharType or has nested CharType/VarcharType. - */ - def hasCharVarchar(dt: DataType): Boolean = { - dt.existsRecursively(f => f.isInstanceOf[CharType] || f.isInstanceOf[VarcharType]) - } - - /** - * Validate the given [[DataType]] to fail if it is char or varchar types or contains nested ones - */ - def failIfHasCharVarchar(dt: DataType): DataType = { - if (!SQLConf.get.charVarcharAsString && hasCharVarchar(dt)) { - throw QueryCompilationErrors.charOrVarcharTypeAsStringUnsupportedError() - } else { - replaceCharVarcharWithString(dt) - } - } - - /** - * Replaces CharType/VarcharType with StringType recursively in the given data type. - */ - def replaceCharVarcharWithString(dt: DataType): DataType = dt match { - case ArrayType(et, nullable) => - ArrayType(replaceCharVarcharWithString(et), nullable) - case MapType(kt, vt, nullable) => - MapType(replaceCharVarcharWithString(kt), replaceCharVarcharWithString(vt), nullable) - case StructType(fields) => - StructType(fields.map { field => - field.copy(dataType = replaceCharVarcharWithString(field.dataType)) - }) - case _: CharType => StringType - case _: VarcharType => StringType - case _ => dt - } - /** * Replaces CharType with VarcharType recursively in the given data type. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 6e809d5bbf0..8120eb9b57e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2008,9 +2008,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = { - new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1189", - messageParameters = Map("operation" -> operation)) + DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation) } def tempViewNotSupportStreamingWriteError(viewName: String): Throwable = { @@ -2380,9 +2378,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def charOrVarcharTypeAsStringUnsupportedError(): Throwable = { - new AnalysisException( - errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", - messageParameters = Map.empty) + DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError() } def escapeCharacterInTheMiddleError(pattern: String, char: String): Throwable = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 983648ff673..183c5425202 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2194,17 +2194,11 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } def fieldIndexOnRowWithoutSchemaError(): SparkUnsupportedOperationException = { - new SparkUnsupportedOperationException( - errorClass = "_LEGACY_ERROR_TEMP_2231", - messageParameters = Map.empty) + DataTypeErrors.fieldIndexOnRowWithoutSchemaError() } def valueIsNullError(index: Int): Throwable = { - new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2232", - messageParameters = Map( - "index" -> toSQLValue(index, IntegerType)), - cause = null) + DataTypeErrors.valueIsNullError(index) } def onlySupportDataSourcesProvidingFileFormatError(providingClass: String): Throwable = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 4a7632486c0..55009f2416c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -794,15 +794,11 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { test("invalidate char/varchar in functions") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""") }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", - parameters = Map.empty, - context = ExpectedContext( - fragment = "from_json('{\"a\": \"str\"}', 'a CHAR(5)')", - start = 7, - stop = 44) + parameters = Map.empty ) withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) { val df = sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""") @@ -816,19 +812,19 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { val df = spark.range(10).map(_.toString).toDF() val schema = new StructType().add("id", CharType(5)) checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.createDataFrame(df.collectAsList(), schema) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" ) checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.createDataFrame(df.rdd, schema) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" ) checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.createDataFrame(df.toJavaRDD, schema) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" @@ -842,12 +838,12 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { test("invalidate char/varchar in spark.read.schema") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.read.schema(new StructType().add("id", CharType(5))) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING") checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.read.schema("id char(5)") }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" @@ -884,13 +880,13 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { test("invalidate char/varchar in udf's result type") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.udf.register("testchar", () => "B", VarcharType(1)) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" ) checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.udf.register("testchar2", (x: String) => x, VarcharType(1)) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" @@ -909,13 +905,13 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { test("invalidate char/varchar in spark.readStream.schema") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.readStream.schema(new StructType().add("id", CharType(5))) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" ) checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { spark.readStream.schema("id char(5)") }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" @@ -938,7 +934,7 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { sql("CREATE TABLE t(c char(10), v varchar(255)) USING parquet") sql("INSERT INTO t VALUES('spark', 'awesome')") val df = sql("SELECT * FROM t") - checkError(exception = intercept[AnalysisException] { + checkError(exception = intercept[SparkException] { df.to(newSchema) }, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", parameters = Map.empty) withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 93b6652d516..94c344fc384 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1364,12 +1364,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata), StructField("theid", IntegerType, false, defaultMetadata))) val parts = Array[String]("THEID < 2", "THEID >= 2") - val e1 = intercept[AnalysisException] { + val e1 = intercept[SparkException] { spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) }.getMessage assert(e1.contains("User specified schema not supported with `jdbc`")) - val e2 = intercept[AnalysisException] { + val e2 = intercept[SparkException] { spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties()) }.getMessage assert(e2.contains("User specified schema not supported with `jdbc`")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 17348fe2dcb..967d384d979 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkContext, TestUtils} +import org.apache.spark.{SparkContext, SparkException, TestUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -633,12 +633,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with testRead(Option(dir).map(spark.read.text).get, data, textSchema) // Reader, with user specified schema, should just apply user schema on the file data - val e = intercept[AnalysisException] { spark.read.schema(userSchema).textFile() } + val e = intercept[SparkException] { spark.read.schema(userSchema).textFile() } assert(e.getMessage.toLowerCase(Locale.ROOT).contains( "user specified schema not supported")) - intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir) } - intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir, dir) } - intercept[AnalysisException] { spark.read.schema(userSchema).textFile(Seq(dir, dir): _*) } + intercept[SparkException] { spark.read.schema(userSchema).textFile(dir) } + intercept[SparkException] { spark.read.schema(userSchema).textFile(dir, dir) } + intercept[SparkException] { spark.read.schema(userSchema).textFile(Seq(dir, dir): _*) } } test("csv - API and behavior regarding schema") { @@ -969,7 +969,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with test("SPARK-16848: table API throws an exception for user specified schema") { withTable("t") { val schema = StructType(StructField("a", StringType) :: Nil) - val e = intercept[AnalysisException] { + val e = intercept[SparkException] { spark.read.schema(schema).table("t") }.getMessage assert(e.contains("User specified schema not supported with `table`")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org