This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 99eb3ff  [SPARK-36227][SQL][3.2] Remove TimestampNTZ type support in 
Spark 3.2
99eb3ff is described below

commit 99eb3ff226dd62691211db0ed9184195e702e20c
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Wed Jul 21 09:55:09 2021 -0700

    [SPARK-36227][SQL][3.2] Remove TimestampNTZ type support in Spark 3.2
    
    ### What changes were proposed in this pull request?
    
    Remove TimestampNTZ type support in the production code of Spark 3.2.
    To archive the goal, this PR adds the check "Utils.isTesting" in the 
following code branches:
    - keyword "timestamp_ntz" and "timestamp_ltz" in parser
    - New expressions from https://issues.apache.org/jira/browse/SPARK-35662
    - Using java.time.localDateTime as the external type for TimestampNTZType
    - `SQLConf.timestampType` which determines the default timestamp type of 
Spark SQL.
    
    This is to minimize the code difference between the master branch. So that 
future users won't think TimestampNTZ is already available in Spark 3.2.
    The downside is that users can still find TimestampNTZType under package 
`org.apache.spark.sql.types`. There should be nothing left other than this.
    ### Why are the changes needed?
    
    As of now, there are some blockers for delivering the TimestampNTZ project 
in Spark 3.2:
    
    - In the Hive Thrift server, both TimestampType and TimestampNTZType are 
mapped to the same timestamp type, which can cause confusion for users.
    - For the Parquet data source, the new written TimestampNTZType Parquet 
columns will be read as TimestampType in old Spark releases. Also, we need to 
decide the merge schema for files mixed with TimestampType and TimestampNTZ 
type.
    - The type coercion rules for TimestampNTZType are incomplete. For example, 
what should the data type of the in clause "IN(Timestamp'2020-01-01 00:00:00', 
TimestampNtz'2020-01-01 00:00:00') be.
    - It is tricky to support TimestampNTZType in JSON/CSV data readers. We 
need to avoid regressions as possible as we can.
    
    There are 10 days left for the expected 3.2 RC date. So, I propose to 
**release the TimestampNTZ type in Spark 3.3 instead of Spark 3.2**. So that we 
have enough time to make considerate designs for the issues.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing Unit tests + manual tests from spark-shell to validate the changes 
are gone.
    New functions
    ```
    spark.sql("select to_timestamp_ntz'2021-01-01 00:00:00'").show()
    spark.sql("select to_timestamp_ltz'2021-01-01 00:00:00'").show()
    spark.sql("select make_timestamp_ntz(1,1,1,1,1,1)").show()
    spark.sql("select make_timestamp_ltz(1,1,1,1,1,1)").show()
    spark.sql("select localtimestamp()").show()
    ```
    The SQL configuration `spark.sql.timestampType` should not work in 3.2
    ```
    spark.conf.set("spark.sql.timestampType", "TIMESTAMP_NTZ")
    spark.sql("select make_timestamp(1,1,1,1,1,1)").schema
    spark.sql("select to_timestamp('2021-01-01 00:00:00')").schema
    spark.sql("select timestamp'2021-01-01 00:00:00'").schema
    Seq((1, java.sql.Timestamp.valueOf("2021-01-01 00:00:00"))).toDF("i", 
"ts").write.partitionBy("ts").parquet("/tmp/test")
    spark.read.parquet("/tmp/test").schema
    ```
    LocalDateTime is not supported as a built-in external type:
    ```
    Seq(LocalDateTime.now()).toDF()
    
org.apache.spark.sql.catalyst.expressions.Literal(java.time.LocalDateTime.now())
    org.apache.spark.sql.catalyst.expressions.Literal(0L, TimestampNTZType)
    ```
    
    Closes #33444 from gengliangwang/banNTZ.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../main/scala/org/apache/spark/sql/Encoders.scala |  8 -------
 .../sql/catalyst/CatalystTypeConverters.scala      |  4 +++-
 .../spark/sql/catalyst/JavaTypeInference.scala     | 11 ++++++---
 .../spark/sql/catalyst/ScalaReflection.scala       | 12 +++++++---
 .../sql/catalyst/analysis/FunctionRegistry.scala   | 22 ++++++++++++-----
 .../apache/spark/sql/catalyst/dsl/package.scala    |  4 ----
 .../spark/sql/catalyst/encoders/RowEncoder.scala   | 10 +++++---
 .../spark/sql/catalyst/expressions/literals.scala  | 14 +++++++----
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 11 +++++----
 .../org/apache/spark/sql/internal/SQLConf.scala    | 15 +++++++-----
 .../scala/org/apache/spark/sql/SQLImplicits.scala  |  3 ---
 .../org/apache/spark/sql/JavaDatasetSuite.java     |  8 -------
 .../apache/spark/sql/DataFrameAggregateSuite.scala | 13 +---------
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  5 ----
 .../test/scala/org/apache/spark/sql/UDFSuite.scala | 28 ----------------------
 15 files changed, 70 insertions(+), 98 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index f23f3c6..e689484 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -115,14 +115,6 @@ object Encoders {
   def LOCALDATE: Encoder[java.time.LocalDate] = ExpressionEncoder()
 
   /**
-   * Creates an encoder that serializes instances of the 
`java.time.LocalDateTime` class
-   * to the internal representation of nullable Catalyst's TimestampNTZType.
-   *
-   * @since 3.2.0
-   */
-  def LOCALDATETIME: Encoder[java.time.LocalDateTime] = ExpressionEncoder()
-
-  /**
    * An encoder for nullable timestamp type.
    *
    * @since 1.6.0
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index f3b8e69..d929897 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType._
 import org.apache.spark.sql.types.YearMonthIntervalType._
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
 
 /**
  * Functions to convert Scala types to Catalyst types and vice versa.
@@ -511,7 +512,8 @@ object CatalystTypeConverters {
     case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
     case t: Timestamp => TimestampConverter.toCatalyst(t)
     case i: Instant => InstantConverter.toCatalyst(i)
-    case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    case l: LocalDateTime if Utils.isTesting => 
TimestampNTZConverter.toCatalyst(l)
     case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, 
d.scale)).toCatalyst(d)
     case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, 
d.scale)).toCatalyst(d)
     case seq: Seq[Any] => new 
GenericArrayData(seq.map(convertToCatalyst).toArray)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 903072a..274b74b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * Type-inference utilities for POJOs and Java collections.
@@ -119,7 +120,9 @@ object JavaTypeInference {
       case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
       case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, 
true)
       case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, 
true)
-      case c: Class[_] if c == classOf[java.time.LocalDateTime] => 
(TimestampNTZType, true)
+      // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with 
minimal code changes.
+      case c: Class[_] if c == classOf[java.time.LocalDateTime] && 
Utils.isTesting =>
+        (TimestampNTZType, true)
       case c: Class[_] if c == classOf[java.time.Duration] => 
(DayTimeIntervalType(), true)
       case c: Class[_] if c == classOf[java.time.Period] => 
(YearMonthIntervalType(), true)
 
@@ -251,7 +254,8 @@ object JavaTypeInference {
       case c if c == classOf[java.sql.Timestamp] =>
         createDeserializerForSqlTimestamp(path)
 
-      case c if c == classOf[java.time.LocalDateTime] =>
+      // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with 
minimal code changes.
+      case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
         createDeserializerForLocalDateTime(path)
 
       case c if c == classOf[java.time.Duration] =>
@@ -413,7 +417,8 @@ object JavaTypeInference {
 
         case c if c == classOf[java.sql.Timestamp] => 
createSerializerForSqlTimestamp(inputObject)
 
-        case c if c == classOf[java.time.LocalDateTime] =>
+        // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with 
minimal code changes.
+        case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
           createSerializerForLocalDateTime(inputObject)
 
         case c if c == classOf[java.time.LocalDate] => 
createSerializerForJavaLocalDate(inputObject)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 4de7e5c..b4761f6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils
 
 
 /**
@@ -752,7 +753,8 @@ object ScalaReflection extends ScalaReflection {
         Schema(TimestampType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
         Schema(TimestampType, nullable = true)
-      case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) =>
+      // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with 
minimal code changes.
+      case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) && 
Utils.isTesting =>
         Schema(TimestampNTZType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => 
Schema(DateType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, 
nullable = true)
@@ -858,7 +860,9 @@ object ScalaReflection extends ScalaReflection {
     StringType -> classOf[UTF8String],
     DateType -> classOf[DateType.InternalType],
     TimestampType -> classOf[TimestampType.InternalType],
-    TimestampNTZType -> classOf[TimestampNTZType.InternalType],
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    TimestampNTZType ->
+      (if (Utils.isTesting) classOf[TimestampNTZType.InternalType] else 
classOf[java.lang.Object]),
     BinaryType -> classOf[BinaryType.InternalType],
     CalendarIntervalType -> classOf[CalendarInterval]
   )
@@ -873,7 +877,9 @@ object ScalaReflection extends ScalaReflection {
     DoubleType -> classOf[java.lang.Double],
     DateType -> classOf[java.lang.Integer],
     TimestampType -> classOf[java.lang.Long],
-    TimestampNTZType -> classOf[java.lang.Long]
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    TimestampNTZType ->
+      (if (Utils.isTesting) classOf[java.lang.Long] else 
classOf[java.lang.Object])
   )
 
   def dataTypeJavaClass(dt: DataType): Class[_] = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 5fce4b6..63b1525 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 
 /**
@@ -317,6 +318,20 @@ object FunctionRegistry {
 
   val FUNC_ALIAS = TreeNodeTag[String]("functionAliasName")
 
+  val expressionsForTimestampNTZSupport: Map[String, (ExpressionInfo, 
FunctionBuilder)] =
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    if (Utils.isTesting) {
+      Map(
+        expression[ParseToTimestampNTZ]("to_timestamp_ntz"),
+        expression[ParseToTimestampLTZ]("to_timestamp_ltz"),
+        expression[MakeTimestampNTZ]("make_timestamp_ntz"),
+        expression[MakeTimestampLTZ]("make_timestamp_ltz"),
+        expression[LocalTimestamp]("localtimestamp")
+      )
+    } else {
+      Map.empty
+    }
+
   // Note: Whenever we add a new entry here, make sure we also update 
ExpressionToSQLSuite
   val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
     // misc non-aggregate functions
@@ -519,7 +534,6 @@ object FunctionRegistry {
     expression[CurrentDate]("current_date"),
     expression[CurrentTimestamp]("current_timestamp"),
     expression[CurrentTimeZone]("current_timezone"),
-    expression[LocalTimestamp]("localtimestamp"),
     expression[DateDiff]("datediff"),
     expression[DateAdd]("date_add"),
     expression[DateFormatClass]("date_format"),
@@ -542,8 +556,6 @@ object FunctionRegistry {
     expression[ParseToDate]("to_date"),
     expression[ToUnixTimestamp]("to_unix_timestamp"),
     expression[ToUTCTimestamp]("to_utc_timestamp"),
-    expression[ParseToTimestampNTZ]("to_timestamp_ntz"),
-    expression[ParseToTimestampLTZ]("to_timestamp_ltz"),
     expression[TruncDate]("trunc"),
     expression[TruncTimestamp]("date_trunc"),
     expression[UnixTimestamp]("unix_timestamp"),
@@ -555,8 +567,6 @@ object FunctionRegistry {
     expression[SessionWindow]("session_window"),
     expression[MakeDate]("make_date"),
     expression[MakeTimestamp]("make_timestamp"),
-    expression[MakeTimestampNTZ]("make_timestamp_ntz"),
-    expression[MakeTimestampLTZ]("make_timestamp_ltz"),
     expression[MakeInterval]("make_interval"),
     expression[MakeDTInterval]("make_dt_interval"),
     expression[MakeYMInterval]("make_ym_interval"),
@@ -712,7 +722,7 @@ object FunctionRegistry {
     expression[CsvToStructs]("from_csv"),
     expression[SchemaOfCsv]("schema_of_csv"),
     expression[StructsToCsv]("to_csv")
-  )
+  ) ++ expressionsForTimestampNTZSupport
 
   val builtin: SimpleFunctionRegistry = {
     val fr = new SimpleFunctionRegistry
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index d0f63ba..46be6b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -297,10 +297,6 @@ package object dsl {
       /** Creates a new AttributeReference of type timestamp */
       def timestamp: AttributeReference = AttributeReference(s, TimestampType, 
nullable = true)()
 
-      /** Creates a new AttributeReference of type timestamp without time zone 
*/
-      def timestampNTZ: AttributeReference =
-        AttributeReference(s, TimestampNTZType, nullable = true)()
-
       /** Creates a new AttributeReference of the day-time interval type */
       def dayTimeInterval(startField: Byte, endField: Byte): 
AttributeReference = {
         AttributeReference(s, DayTimeIntervalType(startField, endField), 
nullable = true)()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index d5cfa50..80477d9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
ArrayData}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A factory for constructing encoders that convert external row to/from the 
Spark SQL
@@ -105,7 +106,8 @@ object RowEncoder {
         createSerializerForSqlTimestamp(inputObject)
       }
 
-    case TimestampNTZType => createSerializerForLocalDateTime(inputObject)
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting => 
createSerializerForLocalDateTime(inputObject)
 
     case DateType =>
       if (SQLConf.get.datetimeJava8ApiEnabled) {
@@ -230,7 +232,8 @@ object RowEncoder {
       } else {
         ObjectType(classOf[java.sql.Timestamp])
       }
-    case TimestampNTZType =>
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting =>
       ObjectType(classOf[java.time.LocalDateTime])
     case DateType =>
       if (SQLConf.get.datetimeJava8ApiEnabled) {
@@ -287,7 +290,8 @@ object RowEncoder {
         createDeserializerForSqlTimestamp(input)
       }
 
-    case TimestampNTZType =>
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting =>
       createDeserializerForLocalDateTime(input)
 
     case DateType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index ee40909..6b398ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -80,7 +80,9 @@ object Literal {
     case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), 
d.scale))
     case i: Instant => Literal(instantToMicros(i), TimestampType)
     case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), 
TimestampType)
-    case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l), 
TimestampNTZType)
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    case l: LocalDateTime if Utils.isTesting =>
+      Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType)
     case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType)
     case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
     case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType())
@@ -120,7 +122,8 @@ object Literal {
     case _ if clz == classOf[Date] => DateType
     case _ if clz == classOf[Instant] => TimestampType
     case _ if clz == classOf[Timestamp] => TimestampType
-    case _ if clz == classOf[LocalDateTime] => TimestampNTZType
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    case _ if clz == classOf[LocalDateTime] && Utils.isTesting => 
TimestampNTZType
     case _ if clz == classOf[Duration] => DayTimeIntervalType()
     case _ if clz == classOf[Period] => YearMonthIntervalType()
     case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT
@@ -185,7 +188,8 @@ object Literal {
     case dt: DecimalType => Literal(Decimal(0, dt.precision, dt.scale))
     case DateType => create(0, DateType)
     case TimestampType => create(0L, TimestampType)
-    case TimestampNTZType => create(0L, TimestampNTZType)
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting => create(0L, TimestampNTZType)
     case it: DayTimeIntervalType => create(0L, it)
     case it: YearMonthIntervalType => create(0, it)
     case StringType => Literal("")
@@ -207,7 +211,9 @@ object Literal {
       case ByteType => v.isInstanceOf[Byte]
       case ShortType => v.isInstanceOf[Short]
       case IntegerType | DateType | _: YearMonthIntervalType => 
v.isInstanceOf[Int]
-      case LongType | TimestampType | TimestampNTZType | _: 
DayTimeIntervalType =>
+      // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with 
minimal code changes.
+      case TimestampNTZType if Utils.isTesting => v.isInstanceOf[Long]
+      case LongType | TimestampType | _: DayTimeIntervalType =>
         v.isInstanceOf[Long]
       case FloatType => v.isInstanceOf[Float]
       case DoubleType => v.isInstanceOf[Double]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d213549..a431758 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils.isTesting
 import org.apache.spark.util.random.RandomSampler
 
 /**
@@ -2131,10 +2132,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with SQLConfHelper with Logg
           val zoneId = getZoneId(conf.sessionLocalTimeZone)
           val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, 
DateType))
           specialDate.getOrElse(toLiteral(stringToDate, DateType))
-        case "TIMESTAMP_NTZ" =>
+        // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with 
minimal code changes.
+        case "TIMESTAMP_NTZ" if isTesting =>
           val specialTs = convertSpecialTimestampNTZ(value).map(Literal(_, 
TimestampNTZType))
           specialTs.getOrElse(toLiteral(stringToTimestampWithoutTimeZone, 
TimestampNTZType))
-        case "TIMESTAMP_LTZ" =>
+        case "TIMESTAMP_LTZ" if isTesting =>
           constructTimestampLTZLiteral(value)
         case "TIMESTAMP" =>
           SQLConf.get.timestampType match {
@@ -2573,8 +2575,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
       case ("double", Nil) => DoubleType
       case ("date", Nil) => DateType
       case ("timestamp", Nil) => SQLConf.get.timestampType
-      case ("timestamp_ntz", Nil) => TimestampNTZType
-      case ("timestamp_ltz", Nil) => TimestampType
+      // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with 
minimal code changes.
+      case ("timestamp_ntz", Nil) if isTesting => TimestampNTZType
+      case ("timestamp_ltz", Nil) if isTesting => TimestampType
       case ("string", Nil) => StringType
       case ("character" | "char", length :: Nil) => 
CharType(length.getText.toInt)
       case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 14c4e55..b3557ec 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2884,9 +2884,10 @@ object SQLConf {
         s"and type literal. Setting the configuration as 
${TimestampTypes.TIMESTAMP_NTZ} will " +
         "use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it 
as " +
         s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME 
ZONE. " +
-        "Before the 3.2.0 release, Spark only supports the TIMESTAMP WITH " +
+        "Before the 3.3.0 release, Spark only supports the TIMESTAMP WITH " +
         "LOCAL TIME ZONE type.")
-      .version("3.2.0")
+      .version("3.3.0")
+      .internal()
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(TimestampTypes.values.map(_.toString))
@@ -3975,12 +3976,14 @@ class SQLConf extends Serializable with Logging {
   def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
 
   def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
-    case "TIMESTAMP_LTZ" =>
+    // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal 
code changes.
+    //              The configuration `TIMESTAMP_TYPE` is only effective for 
testing in Spark 3.2.
+    case "TIMESTAMP_NTZ" if Utils.isTesting =>
+      TimestampNTZType
+
+    case _ =>
       // For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL 
TIME ZONE
       TimestampType
-
-    case "TIMESTAMP_NTZ" =>
-      TimestampNTZType
   }
 
   def nestedSchemaPruningEnabled: Boolean = 
getConf(NESTED_SCHEMA_PRUNING_ENABLED)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index a3004ca..90188ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -82,9 +82,6 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
   /** @since 3.0.0 */
   implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] = 
Encoders.LOCALDATE
 
-  /** @since 3.2.0 */
-  implicit def newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] = 
Encoders.LOCALDATETIME
-
   /** @since 2.2.0 */
   implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] = 
Encoders.TIMESTAMP
 
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 28439f2..9a31616 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -555,14 +555,6 @@ public class JavaDatasetSuite implements Serializable {
   }
 
   @Test
-  public void testLocalDateTimeEncoder() {
-    Encoder<LocalDateTime> encoder = Encoders.LOCALDATETIME();
-    List<LocalDateTime> data = Arrays.asList(LocalDateTime.of(1, 1, 1, 1, 1));
-    Dataset<LocalDateTime> ds = spark.createDataset(data, encoder);
-    Assert.assertEquals(data, ds.collectAsList());
-  }
-
-  @Test
   public void testDurationEncoder() {
     Encoder<Duration> encoder = Encoders.DURATION();
     List<Duration> data = Arrays.asList(Duration.ofDays(0));
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index d0a122e..7a9f980 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.time.{Duration, LocalDateTime, Period}
+import java.time.{Duration, Period}
 
 import scala.util.Random
 
@@ -1416,17 +1416,6 @@ class DataFrameAggregateSuite extends QueryTest
     val df2 = Seq(Period.ofYears(1)).toDF("a").groupBy("a").count()
     checkAnswer(df2, Row(Period.ofYears(1), 1))
   }
-
-  test("SPARK-36054: Support group by TimestampNTZ column") {
-    val ts1 = "2021-01-01T00:00:00"
-    val ts2 = "2021-01-01T00:00:01"
-    val localDateTime = Seq(ts1, ts1, ts2).map(LocalDateTime.parse)
-    val df = localDateTime.toDF("ts").groupBy("ts").count().orderBy("ts")
-    val expectedSchema =
-      new StructType().add(StructField("ts", TimestampNTZType)).add("count", 
LongType, false)
-    assert (df.schema == expectedSchema)
-    checkAnswer(df, Seq(Row(LocalDateTime.parse(ts1), 2), 
Row(LocalDateTime.parse(ts2), 1)))
-  }
 }
 
 case class B(c: Option[Double])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5f71390..6706a1b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2008,11 +2008,6 @@ class DatasetSuite extends QueryTest
     checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: 
Nil)
   }
 
-  test("SPARK-35664: implicit encoder for java.time.LocalDateTime") {
-    val localDateTime = 
java.time.LocalDateTime.parse("2021-06-08T12:31:58.999999")
-    assert(Seq(localDateTime).toDS().head() === localDateTime)
-  }
-
   test("SPARK-34605: implicit encoder for java.time.Duration") {
     val duration = java.time.Duration.ofMinutes(10)
     assert(spark.range(1).map { _ => duration }.head === duration)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 9d32d6f..aad1060 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -847,34 +847,6 @@ class UDFSuite extends QueryTest with SharedSparkSession {
     }
   }
 
-  test("SPARK-35674: using java.time.LocalDateTime in UDF") {
-    // Regular case
-    val input = 
Seq(java.time.LocalDateTime.parse("2021-01-01T00:00:00")).toDF("dateTime")
-    val plusYear = udf((l: java.time.LocalDateTime) => l.plusYears(1))
-    val result = input.select(plusYear($"dateTime").as("newDateTime"))
-    checkAnswer(result, 
Row(java.time.LocalDateTime.parse("2022-01-01T00:00:00")) :: Nil)
-    assert(result.schema === new StructType().add("newDateTime", 
TimestampNTZType))
-    // UDF produces `null`
-    val nullFunc = udf((_: java.time.LocalDateTime) => 
null.asInstanceOf[java.time.LocalDateTime])
-    val nullResult = input.select(nullFunc($"dateTime").as("nullDateTime"))
-    checkAnswer(nullResult, Row(null) :: Nil)
-    assert(nullResult.schema === new StructType().add("nullDateTime", 
TimestampNTZType))
-    // Input parameter of UDF is null
-    val nullInput = 
Seq(null.asInstanceOf[java.time.LocalDateTime]).toDF("nullDateTime")
-    val constDuration = udf((_: java.time.LocalDateTime) =>
-      java.time.LocalDateTime.parse("2021-01-01T00:00:00"))
-    val constResult = 
nullInput.select(constDuration($"nullDateTime").as("firstDayOf2021"))
-    checkAnswer(constResult, 
Row(java.time.LocalDateTime.parse("2021-01-01T00:00:00")) :: Nil)
-    assert(constResult.schema === new StructType().add("firstDayOf2021", 
TimestampNTZType))
-    // Error in the conversion of UDF result to the internal representation of 
timestamp without
-    // time zone
-    val overflowFunc = udf((l: java.time.LocalDateTime) => 
l.plusDays(Long.MaxValue))
-    val e = intercept[SparkException] {
-      input.select(overflowFunc($"dateTime")).collect()
-    }.getCause.getCause
-    assert(e.isInstanceOf[java.lang.ArithmeticException])
-  }
-
   test("SPARK-34663, SPARK-35730: using java.time.Duration in UDF") {
     // Regular case
     val input = Seq(java.time.Duration.ofHours(23)).toDF("d")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to