This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new baa93ec66e3 [SPARK-38813][SQL][3.3] Remove TimestampNTZ type support 
in Spark 3.3
baa93ec66e3 is described below

commit baa93ec66e3068dcbcb52b83fb94101ddaf0a7e4
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Apr 8 21:00:06 2022 +0800

    [SPARK-38813][SQL][3.3] Remove TimestampNTZ type support in Spark 3.3
    
    ### What changes were proposed in this pull request?
    
    Remove TimestampNTZ type support in the production code of Spark 3.3.
    To archive the goal, this PR adds the check "Utils.isTesting" in the 
following code branches:
    - keyword "timestamp_ntz" and "timestamp_ltz" in parser
    - New expressions from https://issues.apache.org/jira/browse/SPARK-35662
    - Using java.time.localDateTime as the external type for TimestampNTZType
    - `SQLConf.timestampType` which determines the default timestamp type of 
Spark SQL.
    
    This is to minimize the code difference between the master branch. So that 
future users won't think TimestampNTZ is already available in Spark 3.3.
    The downside is that users can still find TimestampNTZType under package 
`org.apache.spark.sql.types`. There should be nothing left other than this.
    
    ### Why are the changes needed?
    
    The TimestampNTZ project is not finished yet:
    * Lack Hive metastore support
    * Lack JDBC support
    * Need to spend time scanning the codebase to find out any missing support. 
The current code usages of `TimestampType` are larger than `TimestampNTZType`
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the TimestampNTZ type is not released yet.
    
    ### How was this patch tested?
    
    UT
    
    Closes #36094 from gengliangwang/disableNTZ.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  2 +-
 .../java/org/apache/spark/sql/types/DataTypes.java |  5 ----
 .../main/scala/org/apache/spark/sql/Encoders.scala |  8 ------
 .../sql/catalyst/CatalystTypeConverters.scala      |  4 ++-
 .../spark/sql/catalyst/JavaTypeInference.scala     | 11 +++++---
 .../spark/sql/catalyst/ScalaReflection.scala       | 12 ++++++---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  5 ++++
 .../sql/catalyst/analysis/FunctionRegistry.scala   | 31 +++++++++++++++-------
 .../apache/spark/sql/catalyst/dsl/package.scala    |  4 ---
 .../spark/sql/catalyst/encoders/RowEncoder.scala   | 10 ++++---
 .../catalyst/expressions/datetimeExpressions.scala | 12 ++++-----
 .../spark/sql/catalyst/expressions/literals.scala  | 14 +++++++---
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 10 ++++---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 15 ++++++-----
 .../apache/spark/sql/types/TimestampNTZType.scala  | 11 ++------
 .../parquet/ParquetVectorUpdaterFactory.java       |  6 ++---
 .../scala/org/apache/spark/sql/SQLImplicits.scala  |  3 ---
 .../org/apache/spark/sql/JavaDatasetSuite.java     |  8 ------
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 17 +-----------
 .../apache/spark/sql/DataFrameAggregateSuite.scala | 13 +--------
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  5 ----
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  | 14 +---------
 .../test/scala/org/apache/spark/sql/UDFSuite.scala | 28 -------------------
 23 files changed, 94 insertions(+), 154 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 476201c9a8d..b8abd01d2e1 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -91,7 +91,7 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
           DoubleType,
           DateType,
           TimestampType,
-          TimestampNTZType
+          TimestampNTZType$.MODULE$
         )));
   }
 
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
index 9454c3c2598..90a2baf1303 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
@@ -54,11 +54,6 @@ public class DataTypes {
    */
   public static final DataType TimestampType = TimestampType$.MODULE$;
 
-  /**
-   * Gets the TimestampNTZType object.
-   */
-  public static final DataType TimestampNTZType = TimestampNTZType$.MODULE$;
-
   /**
    * Gets the CalendarIntervalType object.
    */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index 98d2f1a5712..e6894847209 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -114,14 +114,6 @@ object Encoders {
    */
   def LOCALDATE: Encoder[java.time.LocalDate] = ExpressionEncoder()
 
-  /**
-   * Creates an encoder that serializes instances of the 
`java.time.LocalDateTime` class
-   * to the internal representation of nullable Catalyst's TimestampNTZType.
-   *
-   * @since 3.3.0
-   */
-  def LOCALDATETIME: Encoder[java.time.LocalDateTime] = ExpressionEncoder()
-
   /**
    * An encoder for nullable timestamp type.
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 3e6d31e79b7..a28540a4ce9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType._
 import org.apache.spark.sql.types.YearMonthIntervalType._
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
 
 /**
  * Functions to convert Scala types to Catalyst types and vice versa.
@@ -503,7 +504,8 @@ object CatalystTypeConverters {
     case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
     case t: Timestamp => TimestampConverter.toCatalyst(t)
     case i: Instant => InstantConverter.toCatalyst(i)
-    case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    case l: LocalDateTime if Utils.isTesting => 
TimestampNTZConverter.toCatalyst(l)
     case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, 
d.scale)).toCatalyst(d)
     case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, 
d.scale)).toCatalyst(d)
     case seq: Seq[Any] => new 
GenericArrayData(seq.map(convertToCatalyst).toArray)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 903072ae29d..1f93933327e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * Type-inference utilities for POJOs and Java collections.
@@ -119,7 +120,9 @@ object JavaTypeInference {
       case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
       case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, 
true)
       case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, 
true)
-      case c: Class[_] if c == classOf[java.time.LocalDateTime] => 
(TimestampNTZType, true)
+      // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with 
minimal code changes.
+      case c: Class[_] if c == classOf[java.time.LocalDateTime] && 
Utils.isTesting =>
+        (TimestampNTZType, true)
       case c: Class[_] if c == classOf[java.time.Duration] => 
(DayTimeIntervalType(), true)
       case c: Class[_] if c == classOf[java.time.Period] => 
(YearMonthIntervalType(), true)
 
@@ -251,7 +254,8 @@ object JavaTypeInference {
       case c if c == classOf[java.sql.Timestamp] =>
         createDeserializerForSqlTimestamp(path)
 
-      case c if c == classOf[java.time.LocalDateTime] =>
+      // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with 
minimal code changes.
+      case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
         createDeserializerForLocalDateTime(path)
 
       case c if c == classOf[java.time.Duration] =>
@@ -413,7 +417,8 @@ object JavaTypeInference {
 
         case c if c == classOf[java.sql.Timestamp] => 
createSerializerForSqlTimestamp(inputObject)
 
-        case c if c == classOf[java.time.LocalDateTime] =>
+        // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with 
minimal code changes.
+        case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
           createSerializerForLocalDateTime(inputObject)
 
         case c if c == classOf[java.time.LocalDate] => 
createSerializerForJavaLocalDate(inputObject)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index fced82c97b4..e2b624f8e13 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils
 
 
 /**
@@ -752,7 +753,8 @@ object ScalaReflection extends ScalaReflection {
         Schema(TimestampType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
         Schema(TimestampType, nullable = true)
-      case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) =>
+      // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with 
minimal code changes.
+      case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) && 
Utils.isTesting =>
         Schema(TimestampNTZType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => 
Schema(DateType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, 
nullable = true)
@@ -858,7 +860,9 @@ object ScalaReflection extends ScalaReflection {
     StringType -> classOf[UTF8String],
     DateType -> classOf[DateType.InternalType],
     TimestampType -> classOf[TimestampType.InternalType],
-    TimestampNTZType -> classOf[TimestampNTZType.InternalType],
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    TimestampNTZType ->
+      (if (Utils.isTesting) classOf[TimestampNTZType.InternalType] else 
classOf[java.lang.Object]),
     BinaryType -> classOf[BinaryType.InternalType],
     CalendarIntervalType -> classOf[CalendarInterval]
   )
@@ -873,7 +877,9 @@ object ScalaReflection extends ScalaReflection {
     DoubleType -> classOf[java.lang.Double],
     DateType -> classOf[java.lang.Integer],
     TimestampType -> classOf[java.lang.Long],
-    TimestampNTZType -> classOf[java.lang.Long]
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    TimestampNTZType ->
+      (if (Utils.isTesting) classOf[java.lang.Long] else 
classOf[java.lang.Object])
   )
 
   def dataTypeJavaClass(dt: DataType): Class[_] = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c05b9326d23..3b8a73717af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.util.Utils
 
 /**
  * Throws user facing errors when passed invalid queries that fail to analyze.
@@ -157,6 +158,10 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
       case _: ShowTableExtended =>
         throw QueryCompilationErrors.commandUnsupportedInV2TableError("SHOW 
TABLE EXTENDED")
 
+      case operator: LogicalPlan
+        if !Utils.isTesting && 
operator.output.exists(_.dataType.isInstanceOf[TimestampNTZType]) =>
+        operator.failAnalysis("TimestampNTZ type is not supported in Spark 
3.3.")
+
       case operator: LogicalPlan =>
         // Check argument data types of higher-order functions downwards first.
         // If the arguments of the higher-order functions are resolved but the 
type check fails,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 5befa779d16..f6bd9891681 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -34,6 +34,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 
 /**
@@ -358,6 +359,26 @@ object FunctionRegistry {
   // then create a `RuntimeReplaceable` expression to call the Java method 
with `Invoke` or
   // `StaticInvoke` expression. By doing so we don't need to implement codegen 
for new functions
   // anymore. See `AesEncrypt`/`AesDecrypt` as an example.
+  val expressionsForTimestampNTZSupport: Map[String, (ExpressionInfo, 
FunctionBuilder)] =
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    if (Utils.isTesting) {
+      Map(
+        expression[LocalTimestamp]("localtimestamp"),
+        expression[ConvertTimezone]("convert_timezone"),
+        // We keep the 2 expression builders below to have different function 
docs.
+        expressionBuilder(
+          "to_timestamp_ntz", ParseToTimestampNTZExpressionBuilder, setAlias = 
true),
+        expressionBuilder(
+          "to_timestamp_ltz", ParseToTimestampLTZExpressionBuilder, setAlias = 
true),
+        // We keep the 2 expression builders below to have different function 
docs.
+        expressionBuilder("make_timestamp_ntz", 
MakeTimestampNTZExpressionBuilder, setAlias = true),
+        expressionBuilder("make_timestamp_ltz", 
MakeTimestampLTZExpressionBuilder, setAlias = true)
+      )
+    } else {
+      Map.empty
+    }
+
+  // Note: Whenever we add a new entry here, make sure we also update 
ExpressionToSQLSuite
   val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
     // misc non-aggregate functions
     expression[Abs]("abs"),
@@ -580,7 +601,6 @@ object FunctionRegistry {
     expression[CurrentDate]("current_date"),
     expression[CurrentTimestamp]("current_timestamp"),
     expression[CurrentTimeZone]("current_timezone"),
-    expression[LocalTimestamp]("localtimestamp"),
     expression[DateDiff]("datediff"),
     expression[DateAdd]("date_add"),
     expression[DateFormatClass]("date_format"),
@@ -604,9 +624,6 @@ object FunctionRegistry {
     expression[ToBinary]("to_binary"),
     expression[ToUnixTimestamp]("to_unix_timestamp"),
     expression[ToUTCTimestamp]("to_utc_timestamp"),
-    // We keep the 2 expression builders below to have different function docs.
-    expressionBuilder("to_timestamp_ntz", 
ParseToTimestampNTZExpressionBuilder, setAlias = true),
-    expressionBuilder("to_timestamp_ltz", 
ParseToTimestampLTZExpressionBuilder, setAlias = true),
     expression[TruncDate]("trunc"),
     expression[TruncTimestamp]("date_trunc"),
     expression[UnixTimestamp]("unix_timestamp"),
@@ -618,9 +635,6 @@ object FunctionRegistry {
     expression[SessionWindow]("session_window"),
     expression[MakeDate]("make_date"),
     expression[MakeTimestamp]("make_timestamp"),
-    // We keep the 2 expression builders below to have different function docs.
-    expressionBuilder("make_timestamp_ntz", MakeTimestampNTZExpressionBuilder, 
setAlias = true),
-    expressionBuilder("make_timestamp_ltz", MakeTimestampLTZExpressionBuilder, 
setAlias = true),
     expression[MakeInterval]("make_interval"),
     expression[MakeDTInterval]("make_dt_interval"),
     expression[MakeYMInterval]("make_ym_interval"),
@@ -635,7 +649,6 @@ object FunctionRegistry {
     expression[UnixSeconds]("unix_seconds"),
     expression[UnixMillis]("unix_millis"),
     expression[UnixMicros]("unix_micros"),
-    expression[ConvertTimezone]("convert_timezone"),
 
     // collection functions
     expression[CreateArray]("array"),
@@ -782,7 +795,7 @@ object FunctionRegistry {
     expression[CsvToStructs]("from_csv"),
     expression[SchemaOfCsv]("schema_of_csv"),
     expression[StructsToCsv]("to_csv")
-  )
+  ) ++ expressionsForTimestampNTZSupport
 
   val builtin: SimpleFunctionRegistry = {
     val fr = new SimpleFunctionRegistry
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 62b3ee74407..d47e34b110d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -315,10 +315,6 @@ package object dsl {
       /** Creates a new AttributeReference of type timestamp */
       def timestamp: AttributeReference = AttributeReference(s, TimestampType, 
nullable = true)()
 
-      /** Creates a new AttributeReference of type timestamp without time zone 
*/
-      def timestampNTZ: AttributeReference =
-        AttributeReference(s, TimestampNTZType, nullable = true)()
-
       /** Creates a new AttributeReference of the day-time interval type */
       def dayTimeInterval(startField: Byte, endField: Byte): 
AttributeReference = {
         AttributeReference(s, DayTimeIntervalType(startField, endField), 
nullable = true)()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index d7e497fafa8..7b6865d0af4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
ArrayData}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A factory for constructing encoders that convert external row to/from the 
Spark SQL
@@ -112,7 +113,8 @@ object RowEncoder {
         createSerializerForSqlTimestamp(inputObject)
       }
 
-    case TimestampNTZType => createSerializerForLocalDateTime(inputObject)
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting => 
createSerializerForLocalDateTime(inputObject)
 
     case DateType =>
       if (lenient) {
@@ -243,7 +245,8 @@ object RowEncoder {
       } else {
         ObjectType(classOf[java.sql.Timestamp])
       }
-    case TimestampNTZType =>
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting =>
       ObjectType(classOf[java.time.LocalDateTime])
     case DateType =>
       if (SQLConf.get.datetimeJava8ApiEnabled) {
@@ -301,7 +304,8 @@ object RowEncoder {
         createDeserializerForSqlTimestamp(input)
       }
 
-    case TimestampNTZType =>
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting =>
       createDeserializerForLocalDateTime(input)
 
     case DateType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index fc701d4f817..ff3d898942c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -219,7 +219,7 @@ case class Now() extends CurrentTimestampLike {
        2020-04-25 15:49:11.914
   """,
   group = "datetime_funcs",
-  since = "3.3.0")
+  since = "3.4.0")
 case class LocalTimestamp(timeZoneId: Option[String] = None) extends 
LeafExpression
   with TimeZoneAwareExpression with CodegenFallback {
 
@@ -1111,7 +1111,7 @@ case class GetTimestamp(
        2016-12-31 00:00:00
   """,
   group = "datetime_funcs",
-  since = "3.3.0")
+  since = "3.4.0")
 // scalastyle:on line.size.limit
 object ParseToTimestampNTZExpressionBuilder extends ExpressionBuilder {
   override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
@@ -1148,7 +1148,7 @@ object ParseToTimestampNTZExpressionBuilder extends 
ExpressionBuilder {
        2016-12-31 00:00:00
   """,
   group = "datetime_funcs",
-  since = "3.3.0")
+  since = "3.4.0")
 // scalastyle:on line.size.limit
 object ParseToTimestampLTZExpressionBuilder extends ExpressionBuilder {
   override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
@@ -2440,7 +2440,7 @@ case class MakeDate(
        NULL
   """,
   group = "datetime_funcs",
-  since = "3.3.0")
+  since = "3.4.0")
 // scalastyle:on line.size.limit
 object MakeTimestampNTZExpressionBuilder extends ExpressionBuilder {
   override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
@@ -2487,7 +2487,7 @@ object MakeTimestampNTZExpressionBuilder extends 
ExpressionBuilder {
        NULL
   """,
   group = "datetime_funcs",
-  since = "3.3.0")
+  since = "3.4.0")
 // scalastyle:on line.size.limit
 object MakeTimestampLTZExpressionBuilder extends ExpressionBuilder {
   override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
@@ -3015,7 +3015,7 @@ object SubtractDates {
        2021-12-06 00:00:00
   """,
   group = "datetime_funcs",
-  since = "3.3.0")
+  since = "3.4.0")
 // scalastyle:on line.size.limit
 case class ConvertTimezone(
     sourceTz: Expression,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index af10a18e4d1..6262bdef7f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -80,7 +80,9 @@ object Literal {
     case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), 
d.scale))
     case i: Instant => Literal(instantToMicros(i), TimestampType)
     case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), 
TimestampType)
-    case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l), 
TimestampNTZType)
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    case l: LocalDateTime if Utils.isTesting =>
+      Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType)
     case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType)
     case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
     case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType())
@@ -120,7 +122,8 @@ object Literal {
     case _ if clz == classOf[Date] => DateType
     case _ if clz == classOf[Instant] => TimestampType
     case _ if clz == classOf[Timestamp] => TimestampType
-    case _ if clz == classOf[LocalDateTime] => TimestampNTZType
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    case _ if clz == classOf[LocalDateTime] && Utils.isTesting => 
TimestampNTZType
     case _ if clz == classOf[Duration] => DayTimeIntervalType()
     case _ if clz == classOf[Period] => YearMonthIntervalType()
     case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT
@@ -186,7 +189,8 @@ object Literal {
     case dt: DecimalType => Literal(Decimal(0, dt.precision, dt.scale))
     case DateType => create(0, DateType)
     case TimestampType => create(0L, TimestampType)
-    case TimestampNTZType => create(0L, TimestampNTZType)
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    case TimestampNTZType if Utils.isTesting => create(0L, TimestampNTZType)
     case it: DayTimeIntervalType => create(0L, it)
     case it: YearMonthIntervalType => create(0, it)
     case StringType => Literal("")
@@ -208,7 +212,9 @@ object Literal {
       case ByteType => v.isInstanceOf[Byte]
       case ShortType => v.isInstanceOf[Short]
       case IntegerType | DateType | _: YearMonthIntervalType => 
v.isInstanceOf[Int]
-      case LongType | TimestampType | TimestampNTZType | _: 
DayTimeIntervalType =>
+      // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with 
minimal code changes.
+      case TimestampNTZType if Utils.isTesting => v.isInstanceOf[Long]
+      case LongType | TimestampType | _: DayTimeIntervalType =>
         v.isInstanceOf[Long]
       case FloatType => v.isInstanceOf[Float]
       case DoubleType => v.isInstanceOf[Double]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3a22c5ee745..d334b5780f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -48,6 +48,7 @@ import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils.isTesting
 import org.apache.spark.util.random.RandomSampler
 
 /**
@@ -2203,11 +2204,11 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
           val zoneId = getZoneId(conf.sessionLocalTimeZone)
           val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, 
DateType))
           specialDate.getOrElse(toLiteral(stringToDate, DateType))
-        case "TIMESTAMP_NTZ" =>
+        case "TIMESTAMP_NTZ" if isTesting =>
           convertSpecialTimestampNTZ(value, 
getZoneId(conf.sessionLocalTimeZone))
             .map(Literal(_, TimestampNTZType))
             .getOrElse(toLiteral(stringToTimestampWithoutTimeZone, 
TimestampNTZType))
-        case "TIMESTAMP_LTZ" =>
+        case "TIMESTAMP_LTZ" if isTesting =>
           constructTimestampLTZLiteral(value)
         case "TIMESTAMP" =>
           SQLConf.get.timestampType match {
@@ -2658,8 +2659,9 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
       case ("double", Nil) => DoubleType
       case ("date", Nil) => DateType
       case ("timestamp", Nil) => SQLConf.get.timestampType
-      case ("timestamp_ntz", Nil) => TimestampNTZType
-      case ("timestamp_ltz", Nil) => TimestampType
+      // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with 
minimal code changes.
+      case ("timestamp_ntz", Nil) if isTesting => TimestampNTZType
+      case ("timestamp_ltz", Nil) if isTesting => TimestampType
       case ("string", Nil) => StringType
       case ("character" | "char", length :: Nil) => 
CharType(length.getText.toInt)
       case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9b7d4aee745..c4ffc844135 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3215,9 +3215,10 @@ object SQLConf {
         s"and type literal. Setting the configuration as 
${TimestampTypes.TIMESTAMP_NTZ} will " +
         "use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it 
as " +
         s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME 
ZONE. " +
-        "Before the 3.3.0 release, Spark only supports the TIMESTAMP WITH " +
+        "Before the 3.4.0 release, Spark only supports the TIMESTAMP WITH " +
         "LOCAL TIME ZONE type.")
-      .version("3.3.0")
+      .version("3.4.0")
+      .internal()
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(TimestampTypes.values.map(_.toString))
@@ -4359,12 +4360,14 @@ class SQLConf extends Serializable with Logging {
   def strictIndexOperator: Boolean = ansiEnabled && 
getConf(ANSI_STRICT_INDEX_OPERATOR)
 
   def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
-    case "TIMESTAMP_LTZ" =>
+    // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal 
code changes.
+    //              The configuration `TIMESTAMP_TYPE` is only effective for 
testing in Spark 3.3.
+    case "TIMESTAMP_NTZ" if Utils.isTesting =>
+      TimestampNTZType
+
+    case _ =>
       // For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL 
TIME ZONE
       TimestampType
-
-    case "TIMESTAMP_NTZ" =>
-      TimestampNTZType
   }
 
   def nestedSchemaPruningEnabled: Boolean = 
getConf(NESTED_SCHEMA_PRUNING_ENABLED)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala
index ef653100e81..d2a1f2c34c1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.types
 import scala.math.Ordering
 import scala.reflect.runtime.universe.typeTag
 
-import org.apache.spark.annotation.Unstable
-
 /**
  * The timestamp without time zone type represents a local time in microsecond 
precision,
  * which is independent of time zone.
@@ -29,10 +27,8 @@ import org.apache.spark.annotation.Unstable
  * To represent an absolute point in time, use `TimestampType` instead.
  *
  * Please use the singleton `DataTypes.TimestampNTZType` to refer the type.
- * @since 3.3.0
  */
-@Unstable
-class TimestampNTZType private() extends DatetimeType {
+private[spark] class TimestampNTZType private() extends DatetimeType {
   /**
    * Internally, a timestamp is stored as the number of microseconds from
    * the epoch of 1970-01-01T00:00:00.000000(Unix system time zero)
@@ -58,8 +54,5 @@ class TimestampNTZType private() extends DatetimeType {
  * the TimestampNTZType class. Otherwise, the companion object would be of type
  * "TimestampNTZType" in byte code. Defined with a private constructor so the 
companion
  * object is the only possible instantiation.
- *
- * @since 3.3.0
  */
-@Unstable
-case object TimestampNTZType extends TimestampNTZType
+private[spark] case object TimestampNTZType extends TimestampNTZType
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index cce53001ea6..53606f58dcf 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -140,7 +140,7 @@ public class ParquetVectorUpdaterFactory {
         }
         break;
       case INT96:
-        if (sparkType == DataTypes.TimestampNTZType) {
+        if (sparkType == TimestampNTZType$.MODULE$) {
           convertErrorForTimestampNTZ(typeName.name());
         } else if (sparkType == DataTypes.TimestampType) {
           final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
@@ -197,14 +197,14 @@ public class ParquetVectorUpdaterFactory {
     // Throw an exception if the Parquet type is TimestampLTZ and the Catalyst 
type is TimestampNTZ.
     // This is to avoid mistakes in reading the timestamp values.
     if (((TimestampLogicalTypeAnnotation) 
logicalTypeAnnotation).isAdjustedToUTC() &&
-      sparkType == DataTypes.TimestampNTZType) {
+      sparkType == TimestampNTZType$.MODULE$) {
       convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")");
     }
   }
 
   void convertErrorForTimestampNTZ(String parquetType) {
     throw new RuntimeException("Unable to create Parquet converter for data 
type " +
-      DataTypes.TimestampNTZType.json() + " whose Parquet type is " + 
parquetType);
+      TimestampNTZType$.MODULE$.json() + " whose Parquet type is " + 
parquetType);
   }
 
   boolean isUnsignedIntTypeMatched(int bitWidth) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index 044231f4b5a..90188cadfd3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -82,9 +82,6 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
   /** @since 3.0.0 */
   implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] = 
Encoders.LOCALDATE
 
-  /** @since 3.3.0 */
-  implicit def newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] = 
Encoders.LOCALDATETIME
-
   /** @since 2.2.0 */
   implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] = 
Encoders.TIMESTAMP
 
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 22978fb8c28..87105a4d8a7 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -607,14 +607,6 @@ public class JavaDatasetSuite implements Serializable {
     Assert.assertEquals(data, ds.collectAsList());
   }
 
-  @Test
-  public void testLocalDateTimeEncoder() {
-    Encoder<LocalDateTime> encoder = Encoders.LOCALDATETIME();
-    List<LocalDateTime> data = Arrays.asList(LocalDateTime.of(1, 1, 1, 1, 1));
-    Dataset<LocalDateTime> ds = spark.createDataset(data, encoder);
-    Assert.assertEquals(data, ds.collectAsList());
-  }
-
   @Test
   public void testDurationEncoder() {
     Encoder<Duration> encoder = Encoders.DURATION();
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 461bbd8987c..b676c26023a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import java.text.SimpleDateFormat
-import java.time.{Duration, LocalDateTime, Period}
+import java.time.{Duration, Period}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -354,21 +354,6 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
     }
   }
 
-  test("SPARK-36490: Make from_csv/to_csv to handle timestamp_ntz type 
properly") {
-    val localDT = LocalDateTime.parse("2021-08-12T15:16:23")
-    val df = Seq(localDT).toDF
-    val toCsvDF = df.select(to_csv(struct($"value")) as "csv")
-    checkAnswer(toCsvDF, Row("2021-08-12T15:16:23.000"))
-    val fromCsvDF = toCsvDF
-      .select(
-        from_csv(
-          $"csv",
-          StructType(StructField("a", TimestampNTZType) :: Nil),
-          Map.empty[String, String]) as "value")
-      .selectExpr("value.a")
-    checkAnswer(fromCsvDF, Row(localDT))
-  }
-
   test("SPARK-37326: Handle incorrectly formatted timestamp_ntz values in 
from_csv") {
     val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv")
       .select(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 425be96d6b8..72e2d77bba6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.time.{Duration, LocalDateTime, Period}
+import java.time.{Duration, Period}
 
 import scala.util.Random
 
@@ -1434,17 +1434,6 @@ class DataFrameAggregateSuite extends QueryTest
     checkAnswer(df2, Row(Period.ofYears(1), 1))
   }
 
-  test("SPARK-36054: Support group by TimestampNTZ column") {
-    val ts1 = "2021-01-01T00:00:00"
-    val ts2 = "2021-01-01T00:00:01"
-    val localDateTime = Seq(ts1, ts1, ts2).map(LocalDateTime.parse)
-    val df = localDateTime.toDF("ts").groupBy("ts").count().orderBy("ts")
-    val expectedSchema =
-      new StructType().add(StructField("ts", TimestampNTZType)).add("count", 
LongType, false)
-    assert (df.schema == expectedSchema)
-    checkAnswer(df, Seq(Row(LocalDateTime.parse(ts1), 2), 
Row(LocalDateTime.parse(ts2), 1)))
-  }
-
   test("SPARK-36926: decimal average mistakenly overflow") {
     val df = (1 to 10).map(_ => "9999999999.99").toDF("d")
     val res = df.select($"d".cast("decimal(12, 
2)").as("d")).agg(avg($"d").cast("string"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c846441e9e0..cbdf31a6eaf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2106,11 +2106,6 @@ class DatasetSuite extends QueryTest
     checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: 
Nil)
   }
 
-  test("SPARK-35664: implicit encoder for java.time.LocalDateTime") {
-    val localDateTime = 
java.time.LocalDateTime.parse("2021-06-08T12:31:58.999999")
-    assert(Seq(localDateTime).toDS().head() === localDateTime)
-  }
-
   test("SPARK-34605: implicit encoder for java.time.Duration") {
     val duration = java.time.Duration.ofMinutes(10)
     assert(spark.range(1).map { _ => duration }.head === duration)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index e18c087a262..c86e1f6e297 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import java.text.SimpleDateFormat
-import java.time.{Duration, LocalDateTime, Period}
+import java.time.{Duration, Period}
 import java.util.Locale
 
 import collection.JavaConverters._
@@ -918,16 +918,4 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
-
-  test("SPARK-36491: Make from_json/to_json to handle timestamp_ntz type 
properly") {
-    val localDT = LocalDateTime.parse("2021-08-12T15:16:23")
-    val df = Seq(localDT).toDF
-    val toJsonDF = df.select(to_json(map(lit("key"), $"value")) as "json")
-    checkAnswer(toJsonDF, Row("""{"key":"2021-08-12T15:16:23.000"}"""))
-    val fromJsonDF = toJsonDF
-      .select(
-        from_json($"json", StructType(StructField("key", TimestampNTZType) :: 
Nil)) as "value")
-      .selectExpr("value['key']")
-    checkAnswer(fromJsonDF, Row(localDT))
-  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index e651459394f..912811bfda7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -848,34 +848,6 @@ class UDFSuite extends QueryTest with SharedSparkSession {
     }
   }
 
-  test("SPARK-35674: using java.time.LocalDateTime in UDF") {
-    // Regular case
-    val input = 
Seq(java.time.LocalDateTime.parse("2021-01-01T00:00:00")).toDF("dateTime")
-    val plusYear = udf((l: java.time.LocalDateTime) => l.plusYears(1))
-    val result = input.select(plusYear($"dateTime").as("newDateTime"))
-    checkAnswer(result, 
Row(java.time.LocalDateTime.parse("2022-01-01T00:00:00")) :: Nil)
-    assert(result.schema === new StructType().add("newDateTime", 
TimestampNTZType))
-    // UDF produces `null`
-    val nullFunc = udf((_: java.time.LocalDateTime) => 
null.asInstanceOf[java.time.LocalDateTime])
-    val nullResult = input.select(nullFunc($"dateTime").as("nullDateTime"))
-    checkAnswer(nullResult, Row(null) :: Nil)
-    assert(nullResult.schema === new StructType().add("nullDateTime", 
TimestampNTZType))
-    // Input parameter of UDF is null
-    val nullInput = 
Seq(null.asInstanceOf[java.time.LocalDateTime]).toDF("nullDateTime")
-    val constDuration = udf((_: java.time.LocalDateTime) =>
-      java.time.LocalDateTime.parse("2021-01-01T00:00:00"))
-    val constResult = 
nullInput.select(constDuration($"nullDateTime").as("firstDayOf2021"))
-    checkAnswer(constResult, 
Row(java.time.LocalDateTime.parse("2021-01-01T00:00:00")) :: Nil)
-    assert(constResult.schema === new StructType().add("firstDayOf2021", 
TimestampNTZType))
-    // Error in the conversion of UDF result to the internal representation of 
timestamp without
-    // time zone
-    val overflowFunc = udf((l: java.time.LocalDateTime) => 
l.plusDays(Long.MaxValue))
-    val e = intercept[SparkException] {
-      input.select(overflowFunc($"dateTime")).collect()
-    }.getCause.getCause
-    assert(e.isInstanceOf[java.lang.ArithmeticException])
-  }
-
   test("SPARK-34663, SPARK-35730: using java.time.Duration in UDF") {
     // Regular case
     val input = Seq(java.time.Duration.ofHours(23)).toDF("d")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to