This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f375930 [SPARK-31087] [SQL] Add Back Multiple Removed APIs
f375930 is described below
commit f375930d81337f2facbe5da71bb126d4d935e49d
Author: gatorsmile <[email protected]>
AuthorDate: Sat Mar 28 22:05:16 2020 -0700
[SPARK-31087] [SQL] Add Back Multiple Removed APIs
### What changes were proposed in this pull request?
Based on the discussion in the mailing list [[Proposal] Modification to
Spark's Semantic Versioning
Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html)
, this PR is to add back the following APIs whose maintenance cost are
relatively small.
- functions.toDegrees/toRadians
- functions.approxCountDistinct
- functions.monotonicallyIncreasingId
- Column.!==
- Dataset.explode
- Dataset.registerTempTable
- SQLContext.getOrCreate, setActive, clearActive, constructors
Below is the other removed APIs in the original PR, but not added back in
this PR [https://issues.apache.org/jira/browse/SPARK-25908]:
- Remove some AccumulableInfo .apply() methods
- Remove non-label-specific multiclass precision/recall/fScore in favor of
accuracy
- Remove unused Python StorageLevel constants
- Remove unused multiclass option in libsvm parsing
- Remove references to deprecated spark configs like spark.yarn.am.port
- Remove TaskContext.isRunningLocally
- Remove ShuffleMetrics.shuffle* methods
- Remove BaseReadWrite.context in favor of session
### Why are the changes needed?
Avoid breaking the APIs that are commonly used.
### Does this PR introduce any user-facing change?
Adding back the APIs that were removed in 3.0 branch does not introduce the
user-facing changes, because Spark 3.0 has not been released.
### How was this patch tested?
Added a new test suite for these APIs.
Author: gatorsmile <[email protected]>
Author: yi.wu <[email protected]>
Closes #27821 from gatorsmile/addAPIBackV2.
(cherry picked from commit 3884455780a214c620f309e00d5a083039746755)
Signed-off-by: gatorsmile <[email protected]>
---
project/MimaExcludes.scala | 8 --
python/pyspark/sql/dataframe.py | 19 ++++
python/pyspark/sql/functions.py | 11 ++
.../main/scala/org/apache/spark/sql/Column.scala | 18 ++++
.../main/scala/org/apache/spark/sql/Dataset.scala | 98 ++++++++++++++++++
.../scala/org/apache/spark/sql/SQLContext.scala | 50 ++++++++-
.../scala/org/apache/spark/sql/functions.scala | 79 ++++++++++++++
.../org/apache/spark/sql/DataFrameSuite.scala | 46 +++++++++
.../org/apache/spark/sql/DeprecatedAPISuite.scala | 114 +++++++++++++++++++++
.../org/apache/spark/sql/SQLContextSuite.scala | 30 ++++--
10 files changed, 458 insertions(+), 15 deletions(-)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9a5029e..d1ed48a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -235,14 +235,6 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleWriteTime"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleRecordsWritten"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.AccumulableInfo.apply"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.approxCountDistinct"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toRadians"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toDegrees"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.monotonicallyIncreasingId"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.clearActive"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.getOrCreate"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.setActive"),
-
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.SQLContext.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.fMeasure"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.recall"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.precision"),
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 44cb264..2a366dc 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -122,6 +122,25 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
rdd = self._jdf.toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
+ @since(1.3)
+ def registerTempTable(self, name):
+ """Registers this DataFrame as a temporary table using the given name.
+
+ The lifetime of this temporary table is tied to the
:class:`SparkSession`
+ that was used to create this :class:`DataFrame`.
+
+ >>> df.registerTempTable("people")
+ >>> df2 = spark.sql("select * from people")
+ >>> sorted(df.collect()) == sorted(df2.collect())
+ True
+ >>> spark.catalog.dropTempView("people")
+
+ .. note:: Deprecated in 2.0, use createOrReplaceTempView instead.
+ """
+ warnings.warn(
+ "Deprecated in 2.0, use createOrReplaceTempView instead.",
DeprecationWarning)
+ self._jdf.createOrReplaceTempView(name)
+
@since(2.0)
def createTempView(self, name):
"""Creates a local temporary view with this :class:`DataFrame`.
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 61a221c..543bb7f 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -296,6 +296,8 @@ _window_functions = {
# Wraps deprecated functions (keys) with the messages (values).
_functions_deprecated = {
+ 'toDegrees': 'Deprecated in 2.1, use degrees instead.',
+ 'toRadians': 'Deprecated in 2.1, use radians instead.',
}
for _name, _doc in _functions.items():
@@ -319,6 +321,15 @@ for _name, _doc in _functions_2_4.items():
del _name, _doc
+@since(1.3)
+def approxCountDistinct(col, rsd=None):
+ """
+ .. note:: Deprecated in 2.1, use :func:`approx_count_distinct` instead.
+ """
+ warnings.warn("Deprecated in 2.1, use approx_count_distinct instead.",
DeprecationWarning)
+ return approx_count_distinct(col, rsd)
+
+
@since(2.1)
def approx_count_distinct(col, rsd=None):
"""Aggregate function: returns a new :class:`Column` for approximate
distinct count of
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 8bd5835..49c9f83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -331,6 +331,24 @@ class Column(val expr: Expression) extends Logging {
* df.filter( col("colA").notEqual(col("colB")) );
* }}}
*
+ * @group expr_ops
+ * @since 1.3.0
+ */
+ @deprecated("!== does not have the same precedence as ===, use =!= instead",
"2.0.0")
+ def !== (other: Any): Column = this =!= other
+
+ /**
+ * Inequality test.
+ * {{{
+ * // Scala:
+ * df.select( df("colA") !== df("colB") )
+ * df.select( !(df("colA") === df("colB")) )
+ *
+ * // Java:
+ * import static org.apache.spark.sql.functions.*;
+ * df.filter( col("colA").notEqual(col("colB")) );
+ * }}}
+ *
* @group java_expr_ops
* @since 1.3.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index b910136..5c3a82a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -22,6 +22,7 @@ import java.io.{ByteArrayOutputStream, CharArrayWriter,
DataOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
@@ -34,6 +35,7 @@ import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.api.r.RRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow,
ScalaReflection}
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
@@ -2266,6 +2268,90 @@ class Dataset[T] private[sql](
}
/**
+ * (Scala-specific) Returns a new Dataset where each row has been expanded
to zero or more
+ * rows by the provided function. This is similar to a `LATERAL VIEW` in
HiveQL. The columns of
+ * the input row are implicitly joined with each row that is output by the
function.
+ *
+ * Given that this is deprecated, as an alternative, you can explode columns
either using
+ * `functions.explode()` or `flatMap()`. The following example uses these
alternatives to count
+ * the number of books that contain a given word:
+ *
+ * {{{
+ * case class Book(title: String, words: String)
+ * val ds: Dataset[Book]
+ *
+ * val allWords = ds.select('title, explode(split('words, " ")).as("word"))
+ *
+ * val bookCountPerWord =
allWords.groupBy("word").agg(countDistinct("title"))
+ * }}}
+ *
+ * Using `flatMap()` this can similarly be exploded as:
+ *
+ * {{{
+ * ds.flatMap(_.words.split(" "))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @deprecated("use flatMap() or select() with functions.explode() instead",
"2.0.0")
+ def explode[A <: Product : TypeTag](input: Column*)(f: Row =>
TraversableOnce[A]): DataFrame = {
+ val elementSchema =
ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
+
+ val convert =
CatalystTypeConverters.createToCatalystConverter(elementSchema)
+
+ val rowFunction =
+ f.andThen(_.map(convert(_).asInstanceOf[InternalRow]))
+ val generator = UserDefinedGenerator(elementSchema, rowFunction,
input.map(_.expr))
+
+ withPlan {
+ Generate(generator, unrequiredChildIndex = Nil, outer = false,
+ qualifier = None, generatorOutput = Nil, logicalPlan)
+ }
+ }
+
+ /**
+ * (Scala-specific) Returns a new Dataset where a single column has been
expanded to zero
+ * or more rows by the provided function. This is similar to a `LATERAL
VIEW` in HiveQL. All
+ * columns of the input row are implicitly joined with each value that is
output by the function.
+ *
+ * Given that this is deprecated, as an alternative, you can explode columns
either using
+ * `functions.explode()`:
+ *
+ * {{{
+ * ds.select(explode(split('words, " ")).as("word"))
+ * }}}
+ *
+ * or `flatMap()`:
+ *
+ * {{{
+ * ds.flatMap(_.words.split(" "))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @deprecated("use flatMap() or select() with functions.explode() instead",
"2.0.0")
+ def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A
=> TraversableOnce[B])
+ : DataFrame = {
+ val dataType = ScalaReflection.schemaFor[B].dataType
+ val attributes = AttributeReference(outputColumn, dataType)() :: Nil
+ // TODO handle the metadata?
+ val elementSchema = attributes.toStructType
+
+ def rowFunction(row: Row): TraversableOnce[InternalRow] = {
+ val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
+ f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
+ }
+ val generator = UserDefinedGenerator(elementSchema, rowFunction,
apply(inputColumn).expr :: Nil)
+
+ withPlan {
+ Generate(generator, unrequiredChildIndex = Nil, outer = false,
+ qualifier = None, generatorOutput = Nil, logicalPlan)
+ }
+ }
+
+ /**
* Returns a new Dataset by adding a column or replacing the existing column
that has
* the same name.
*
@@ -3130,6 +3216,18 @@ class Dataset[T] private[sql](
def javaRDD: JavaRDD[T] = toJavaRDD
/**
+ * Registers this Dataset as a temporary table using the given name. The
lifetime of this
+ * temporary table is tied to the [[SparkSession]] that was used to create
this Dataset.
+ *
+ * @group basic
+ * @since 1.6.0
+ */
+ @deprecated("Use createOrReplaceTempView(viewName) instead.", "2.0.0")
+ def registerTempTable(tableName: String): Unit = {
+ createOrReplaceTempView(tableName)
+ }
+
+ /**
* Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create
this Dataset.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bbcc842..68ce82d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,7 +24,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
@@ -64,6 +64,15 @@ class SQLContext private[sql](val sparkSession: SparkSession)
// Note: Since Spark 2.0 this class has become a wrapper of SparkSession,
where the
// real functionality resides. This class remains mainly for backward
compatibility.
+
+ @deprecated("Use SparkSession.builder instead", "2.0.0")
+ def this(sc: SparkContext) = {
+ this(SparkSession.builder().sparkContext(sc).getOrCreate())
+ }
+
+ @deprecated("Use SparkSession.builder instead", "2.0.0")
+ def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
+
// TODO: move this logic into SparkSession
private[sql] def sessionState: SessionState = sparkSession.sessionState
@@ -998,6 +1007,45 @@ class SQLContext private[sql](val sparkSession:
SparkSession)
object SQLContext {
/**
+ * Get the singleton SQLContext if it exists or create a new one using the
given SparkContext.
+ *
+ * This function can be used to create a singleton SQLContext object that
can be shared across
+ * the JVM.
+ *
+ * If there is an active SQLContext for current thread, it will be returned
instead of the global
+ * one.
+ *
+ * @since 1.5.0
+ */
+ @deprecated("Use SparkSession.builder instead", "2.0.0")
+ def getOrCreate(sparkContext: SparkContext): SQLContext = {
+ SparkSession.builder().sparkContext(sparkContext).getOrCreate().sqlContext
+ }
+
+ /**
+ * Changes the SQLContext that will be returned in this thread and its
children when
+ * SQLContext.getOrCreate() is called. This can be used to ensure that a
given thread receives
+ * a SQLContext with an isolated session, instead of the global (first
created) context.
+ *
+ * @since 1.6.0
+ */
+ @deprecated("Use SparkSession.setActiveSession instead", "2.0.0")
+ def setActive(sqlContext: SQLContext): Unit = {
+ SparkSession.setActiveSession(sqlContext.sparkSession)
+ }
+
+ /**
+ * Clears the active SQLContext for current thread. Subsequent calls to
getOrCreate will
+ * return the first created context instead of a thread-local override.
+ *
+ * @since 1.6.0
+ */
+ @deprecated("Use SparkSession.clearActiveSession instead", "2.0.0")
+ def clearActive(): Unit = {
+ SparkSession.clearActiveSession()
+ }
+
+ /**
* Converts an iterator of Java Beans to InternalRow using the provided
* bean info & schema. This is not related to the singleton, but is a static
* method for internal use.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index cad7916..0ca4238 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -212,6 +212,36 @@ object functions {
//////////////////////////////////////////////////////////////////////////////////////////////
/**
+ * @group agg_funcs
+ * @since 1.3.0
+ */
+ @deprecated("Use approx_count_distinct", "2.1.0")
+ def approxCountDistinct(e: Column): Column = approx_count_distinct(e)
+
+ /**
+ * @group agg_funcs
+ * @since 1.3.0
+ */
+ @deprecated("Use approx_count_distinct", "2.1.0")
+ def approxCountDistinct(columnName: String): Column =
approx_count_distinct(columnName)
+
+ /**
+ * @group agg_funcs
+ * @since 1.3.0
+ */
+ @deprecated("Use approx_count_distinct", "2.1.0")
+ def approxCountDistinct(e: Column, rsd: Double): Column =
approx_count_distinct(e, rsd)
+
+ /**
+ * @group agg_funcs
+ * @since 1.3.0
+ */
+ @deprecated("Use approx_count_distinct", "2.1.0")
+ def approxCountDistinct(columnName: String, rsd: Double): Column = {
+ approx_count_distinct(Column(columnName), rsd)
+ }
+
+ /**
* Aggregate function: returns the approximate number of distinct items in a
group.
*
* @group agg_funcs
@@ -1106,6 +1136,27 @@ object functions {
* }}}
*
* @group normal_funcs
+ * @since 1.4.0
+ */
+ @deprecated("Use monotonically_increasing_id()", "2.0.0")
+ def monotonicallyIncreasingId(): Column = monotonically_increasing_id()
+
+ /**
+ * A column expression that generates monotonically increasing 64-bit
integers.
+ *
+ * The generated ID is guaranteed to be monotonically increasing and unique,
but not consecutive.
+ * The current implementation puts the partition ID in the upper 31 bits,
and the record number
+ * within each partition in the lower 33 bits. The assumption is that the
data frame has
+ * less than 1 billion partitions, and each partition has less than 8
billion records.
+ *
+ * As an example, consider a `DataFrame` with two partitions, each with 3
records.
+ * This expression would return the following IDs:
+ *
+ * {{{
+ * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
+ * }}}
+ *
+ * @group normal_funcs
* @since 1.6.0
*/
def monotonically_increasing_id(): Column = withExpr {
MonotonicallyIncreasingID() }
@@ -2072,6 +2123,20 @@ object functions {
def tanh(columnName: String): Column = tanh(Column(columnName))
/**
+ * @group math_funcs
+ * @since 1.4.0
+ */
+ @deprecated("Use degrees", "2.1.0")
+ def toDegrees(e: Column): Column = degrees(e)
+
+ /**
+ * @group math_funcs
+ * @since 1.4.0
+ */
+ @deprecated("Use degrees", "2.1.0")
+ def toDegrees(columnName: String): Column = degrees(Column(columnName))
+
+ /**
* Converts an angle measured in radians to an approximately equivalent
angle measured in degrees.
*
* @param e angle in radians
@@ -2094,6 +2159,20 @@ object functions {
def degrees(columnName: String): Column = degrees(Column(columnName))
/**
+ * @group math_funcs
+ * @since 1.4.0
+ */
+ @deprecated("Use radians", "2.1.0")
+ def toRadians(e: Column): Column = radians(e)
+
+ /**
+ * @group math_funcs
+ * @since 1.4.0
+ */
+ @deprecated("Use radians", "2.1.0")
+ def toRadians(columnName: String): Column = radians(Column(columnName))
+
+ /**
* Converts an angle measured in degrees to an approximately equivalent
angle measured in radians.
*
* @param e angle in degrees
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 72aa7bf..1762bc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -109,6 +109,31 @@ class DataFrameSuite extends QueryTest
dfAlias.col("t2.c")
}
+ test("simple explode") {
+ val df = Seq(Tuple1("a b c"), Tuple1("d e")).toDF("words")
+
+ checkAnswer(
+ df.explode("words", "word") { word: String => word.split(" ").toSeq
}.select('word),
+ Row("a") :: Row("b") :: Row("c") :: Row("d") ::Row("e") :: Nil
+ )
+ }
+
+ test("explode") {
+ val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
+ val df2 =
+ df.explode('letters) {
+ case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
+ }
+
+ checkAnswer(
+ df2
+ .select('_1 as 'letter, 'number)
+ .groupBy('letter)
+ .agg(countDistinct('number)),
+ Row("a", 3) :: Row("b", 2) :: Row("c", 1) :: Nil
+ )
+ }
+
test("Star Expansion - CreateStruct and CreateArray") {
val structDf = testData2.select("a", "b").as("record")
// CreateStruct and CreateArray in aggregateExpressions
@@ -185,6 +210,27 @@ class DataFrameSuite extends QueryTest
}
}
+ test("Star Expansion - ds.explode should fail with a meaningful message if
it takes a star") {
+ val df = Seq(("1", "1,2"), ("2", "4"), ("3", "7,8,9")).toDF("prefix",
"csv")
+ val e = intercept[AnalysisException] {
+ df.explode($"*") { case Row(prefix: String, csv: String) =>
+ csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq
+ }.queryExecution.assertAnalyzed()
+ }
+ assert(e.getMessage.contains("Invalid usage of '*' in
explode/json_tuple/UDTF"))
+
+ checkAnswer(
+ df.explode('prefix, 'csv) { case Row(prefix: String, csv: String) =>
+ csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq
+ },
+ Row("1", "1,2", "1:1") ::
+ Row("1", "1,2", "1:2") ::
+ Row("2", "4", "2:4") ::
+ Row("3", "7,8,9", "3:7") ::
+ Row("3", "7,8,9", "3:8") ::
+ Row("3", "7,8,9", "3:9") :: Nil)
+ }
+
test("Star Expansion - explode should fail with a meaningful message if it
takes a star") {
val df = Seq(("1,2"), ("4"), ("7,8,9")).toDF("csv")
val e = intercept[AnalysisException] {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
index c31ef99..25b8849 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
@@ -17,10 +17,124 @@
package org.apache.spark.sql
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
class DeprecatedAPISuite extends QueryTest with SharedSparkSession {
+ import MathFunctionsTestData.DoubleData
+ import testImplicits._
+
+ private lazy val doubleData = (1 to 10).map(i => DoubleData(i * 0.2 - 1, i *
-0.2 + 1)).toDF()
+
+ private def testOneToOneMathFunction[
+ @specialized(Int, Long, Float, Double) T,
+ @specialized(Int, Long, Float, Double) U](
+ c: Column => Column,
+ f: T => U): Unit = {
+ checkAnswer(
+ doubleData.select(c('a)),
+ (1 to 10).map(n => Row(f((n * 0.2 - 1).asInstanceOf[T])))
+ )
+
+ checkAnswer(
+ doubleData.select(c('b)),
+ (1 to 10).map(n => Row(f((-n * 0.2 + 1).asInstanceOf[T])))
+ )
+
+ checkAnswer(
+ doubleData.select(c(lit(null))),
+ (1 to 10).map(_ => Row(null))
+ )
+ }
+
+ test("functions.toDegrees") {
+ testOneToOneMathFunction(toDegrees, math.toDegrees)
+ withView("t") {
+ val df = Seq(0, 1, 1.5).toDF("a")
+ df.createOrReplaceTempView("t")
+
+ checkAnswer(
+ sql("SELECT degrees(0), degrees(1), degrees(1.5)"),
+ Seq(0).toDF().select(toDegrees(lit(0)), toDegrees(lit(1)),
toDegrees(lit(1.5)))
+ )
+ checkAnswer(
+ sql("SELECT degrees(a) FROM t"),
+ df.select(toDegrees("a"))
+ )
+ }
+ }
+
+ test("functions.toRadians") {
+ testOneToOneMathFunction(toRadians, math.toRadians)
+ withView("t") {
+ val df = Seq(0, 1, 1.5).toDF("a")
+ df.createOrReplaceTempView("t")
+
+ checkAnswer(
+ sql("SELECT radians(0), radians(1), radians(1.5)"),
+ Seq(0).toDF().select(toRadians(lit(0)), toRadians(lit(1)),
toRadians(lit(1.5)))
+ )
+ checkAnswer(
+ sql("SELECT radians(a) FROM t"),
+ df.select(toRadians("a"))
+ )
+ }
+ }
+
+ test("functions.approxCountDistinct") {
+ withView("t") {
+ val df = Seq(0, 1, 2).toDF("a")
+ df.createOrReplaceTempView("t")
+ checkAnswer(
+ sql("SELECT approx_count_distinct(a) FROM t"),
+ df.select(approxCountDistinct("a")))
+ }
+ }
+
+ test("functions.monotonicallyIncreasingId") {
+ // Make sure we have 2 partitions, each with 2 records.
+ val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
+ Iterator(Tuple1(1), Tuple1(2))
+ }.toDF("a")
+ checkAnswer(
+ df.select(monotonicallyIncreasingId(),
expr("monotonically_increasing_id()")),
+ Row(0L, 0L) ::
+ Row(1L, 1L) ::
+ Row((1L << 33) + 0L, (1L << 33) + 0L) ::
+ Row((1L << 33) + 1L, (1L << 33) + 1L) :: Nil
+ )
+ }
+
+ test("Column.!==") {
+ val nullData = Seq(
+ (Some(1), Some(1)), (Some(1), Some(2)), (Some(1), None), (None,
None)).toDF("a", "b")
+ checkAnswer(
+ nullData.filter($"b" !== 1),
+ Row(1, 2) :: Nil)
+
+ checkAnswer(nullData.filter($"b" !== null), Nil)
+
+ checkAnswer(
+ nullData.filter($"a" !== $"b"),
+ Row(1, 2) :: Nil)
+ }
+
+ test("Dataset.registerTempTable") {
+ withTempView("t") {
+ Seq(1).toDF().registerTempTable("t")
+ assert(spark.catalog.tableExists("t"))
+ }
+ }
+
+ test("SQLContext.setActive/clearActive") {
+ val sc = spark.sparkContext
+ val sqlContext = new SQLContext(sc)
+ SQLContext.setActive(sqlContext)
+ assert(SparkSession.getActiveSession === Some(spark))
+ SQLContext.clearActive()
+ assert(SparkSession.getActiveSession === None)
+ }
test("SQLContext.applySchema") {
val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry",
18)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index aab2ae4..a179982 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -24,14 +24,32 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, StringType, StructField,
StructType}
+@deprecated("This suite is deprecated to silent compiler deprecation
warnings", "2.0.0")
class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
object DummyRule extends Rule[LogicalPlan] {
def apply(p: LogicalPlan): LogicalPlan = p
}
+ test("getOrCreate instantiates SQLContext") {
+ val sqlContext = SQLContext.getOrCreate(sc)
+ assert(sqlContext != null, "SQLContext.getOrCreate returned null")
+ assert(SQLContext.getOrCreate(sc).eq(sqlContext),
+ "SQLContext created by SQLContext.getOrCreate not returned by
SQLContext.getOrCreate")
+ }
+
+ test("getOrCreate return the original SQLContext") {
+ val sqlContext = SQLContext.getOrCreate(sc)
+ val newSession = sqlContext.newSession()
+ assert(SQLContext.getOrCreate(sc).eq(sqlContext),
+ "SQLContext.getOrCreate after explicitly created SQLContext did not
return the context")
+ SparkSession.setActiveSession(newSession.sparkSession)
+ assert(SQLContext.getOrCreate(sc).eq(newSession),
+ "SQLContext.getOrCreate after explicitly setActive() did not return the
active context")
+ }
+
test("Sessions of SQLContext") {
- val sqlContext =
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+ val sqlContext = SQLContext.getOrCreate(sc)
val session1 = sqlContext.newSession()
val session2 = sqlContext.newSession()
@@ -59,13 +77,13 @@ class SQLContextSuite extends SparkFunSuite with
SharedSparkContext {
}
test("Catalyst optimization passes are modifiable at runtime") {
- val sqlContext =
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+ val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.experimental.extraOptimizations = Seq(DummyRule)
assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
}
test("get all tables") {
- val sqlContext =
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+ val sqlContext = SQLContext.getOrCreate(sc)
val df = sqlContext.range(10)
df.createOrReplaceTempView("listtablessuitetable")
assert(
@@ -82,7 +100,7 @@ class SQLContextSuite extends SparkFunSuite with
SharedSparkContext {
}
test("getting all tables with a database name has no impact on returned
table names") {
- val sqlContext =
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+ val sqlContext = SQLContext.getOrCreate(sc)
val df = sqlContext.range(10)
df.createOrReplaceTempView("listtablessuitetable")
assert(
@@ -99,7 +117,7 @@ class SQLContextSuite extends SparkFunSuite with
SharedSparkContext {
}
test("query the returned DataFrame of tables") {
- val sqlContext =
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+ val sqlContext = SQLContext.getOrCreate(sc)
val df = sqlContext.range(10)
df.createOrReplaceTempView("listtablessuitetable")
@@ -109,7 +127,7 @@ class SQLContextSuite extends SparkFunSuite with
SharedSparkContext {
StructField("isTemporary", BooleanType, false) :: Nil)
Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach {
- tableDF =>
+ case tableDF =>
assert(expectedSchema === tableDF.schema)
tableDF.createOrReplaceTempView("tables")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]