This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new baa93ec66e3 [SPARK-38813][SQL][3.3] Remove TimestampNTZ type support
in Spark 3.3
baa93ec66e3 is described below
commit baa93ec66e3068dcbcb52b83fb94101ddaf0a7e4
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Apr 8 21:00:06 2022 +0800
[SPARK-38813][SQL][3.3] Remove TimestampNTZ type support in Spark 3.3
### What changes were proposed in this pull request?
Remove TimestampNTZ type support in the production code of Spark 3.3.
To archive the goal, this PR adds the check "Utils.isTesting" in the
following code branches:
- keyword "timestamp_ntz" and "timestamp_ltz" in parser
- New expressions from https://issues.apache.org/jira/browse/SPARK-35662
- Using java.time.localDateTime as the external type for TimestampNTZType
- `SQLConf.timestampType` which determines the default timestamp type of
Spark SQL.
This is to minimize the code difference between the master branch. So that
future users won't think TimestampNTZ is already available in Spark 3.3.
The downside is that users can still find TimestampNTZType under package
`org.apache.spark.sql.types`. There should be nothing left other than this.
### Why are the changes needed?
The TimestampNTZ project is not finished yet:
* Lack Hive metastore support
* Lack JDBC support
* Need to spend time scanning the codebase to find out any missing support.
The current code usages of `TimestampType` are larger than `TimestampNTZType`
### Does this PR introduce _any_ user-facing change?
No, the TimestampNTZ type is not released yet.
### How was this patch tested?
UT
Closes #36094 from gengliangwang/disableNTZ.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +-
.../java/org/apache/spark/sql/types/DataTypes.java | 5 ----
.../main/scala/org/apache/spark/sql/Encoders.scala | 8 ------
.../sql/catalyst/CatalystTypeConverters.scala | 4 ++-
.../spark/sql/catalyst/JavaTypeInference.scala | 11 +++++---
.../spark/sql/catalyst/ScalaReflection.scala | 12 ++++++---
.../sql/catalyst/analysis/CheckAnalysis.scala | 5 ++++
.../sql/catalyst/analysis/FunctionRegistry.scala | 31 +++++++++++++++-------
.../apache/spark/sql/catalyst/dsl/package.scala | 4 ---
.../spark/sql/catalyst/encoders/RowEncoder.scala | 10 ++++---
.../catalyst/expressions/datetimeExpressions.scala | 12 ++++-----
.../spark/sql/catalyst/expressions/literals.scala | 14 +++++++---
.../spark/sql/catalyst/parser/AstBuilder.scala | 10 ++++---
.../org/apache/spark/sql/internal/SQLConf.scala | 15 ++++++-----
.../apache/spark/sql/types/TimestampNTZType.scala | 11 ++------
.../parquet/ParquetVectorUpdaterFactory.java | 6 ++---
.../scala/org/apache/spark/sql/SQLImplicits.scala | 3 ---
.../org/apache/spark/sql/JavaDatasetSuite.java | 8 ------
.../org/apache/spark/sql/CsvFunctionsSuite.scala | 17 +-----------
.../apache/spark/sql/DataFrameAggregateSuite.scala | 13 +--------
.../scala/org/apache/spark/sql/DatasetSuite.scala | 5 ----
.../org/apache/spark/sql/JsonFunctionsSuite.scala | 14 +---------
.../test/scala/org/apache/spark/sql/UDFSuite.scala | 28 -------------------
23 files changed, 94 insertions(+), 154 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 476201c9a8d..b8abd01d2e1 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -91,7 +91,7 @@ public final class UnsafeRow extends InternalRow implements
Externalizable, Kryo
DoubleType,
DateType,
TimestampType,
- TimestampNTZType
+ TimestampNTZType$.MODULE$
)));
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
index 9454c3c2598..90a2baf1303 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
@@ -54,11 +54,6 @@ public class DataTypes {
*/
public static final DataType TimestampType = TimestampType$.MODULE$;
- /**
- * Gets the TimestampNTZType object.
- */
- public static final DataType TimestampNTZType = TimestampNTZType$.MODULE$;
-
/**
* Gets the CalendarIntervalType object.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index 98d2f1a5712..e6894847209 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -114,14 +114,6 @@ object Encoders {
*/
def LOCALDATE: Encoder[java.time.LocalDate] = ExpressionEncoder()
- /**
- * Creates an encoder that serializes instances of the
`java.time.LocalDateTime` class
- * to the internal representation of nullable Catalyst's TimestampNTZType.
- *
- * @since 3.3.0
- */
- def LOCALDATETIME: Encoder[java.time.LocalDateTime] = ExpressionEncoder()
-
/**
* An encoder for nullable timestamp type.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 3e6d31e79b7..a28540a4ce9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.YearMonthIntervalType._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
/**
* Functions to convert Scala types to Catalyst types and vice versa.
@@ -503,7 +504,8 @@ object CatalystTypeConverters {
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
case t: Timestamp => TimestampConverter.toCatalyst(t)
case i: Instant => InstantConverter.toCatalyst(i)
- case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ case l: LocalDateTime if Utils.isTesting =>
TimestampNTZConverter.toCatalyst(l)
case d: BigDecimal => new DecimalConverter(DecimalType(d.precision,
d.scale)).toCatalyst(d)
case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision,
d.scale)).toCatalyst(d)
case seq: Seq[Any] => new
GenericArrayData(seq.map(convertToCatalyst).toArray)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 903072ae29d..1f93933327e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* Type-inference utilities for POJOs and Java collections.
@@ -119,7 +120,9 @@ object JavaTypeInference {
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType,
true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType,
true)
- case c: Class[_] if c == classOf[java.time.LocalDateTime] =>
(TimestampNTZType, true)
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with
minimal code changes.
+ case c: Class[_] if c == classOf[java.time.LocalDateTime] &&
Utils.isTesting =>
+ (TimestampNTZType, true)
case c: Class[_] if c == classOf[java.time.Duration] =>
(DayTimeIntervalType(), true)
case c: Class[_] if c == classOf[java.time.Period] =>
(YearMonthIntervalType(), true)
@@ -251,7 +254,8 @@ object JavaTypeInference {
case c if c == classOf[java.sql.Timestamp] =>
createDeserializerForSqlTimestamp(path)
- case c if c == classOf[java.time.LocalDateTime] =>
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with
minimal code changes.
+ case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
createDeserializerForLocalDateTime(path)
case c if c == classOf[java.time.Duration] =>
@@ -413,7 +417,8 @@ object JavaTypeInference {
case c if c == classOf[java.sql.Timestamp] =>
createSerializerForSqlTimestamp(inputObject)
- case c if c == classOf[java.time.LocalDateTime] =>
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with
minimal code changes.
+ case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
createSerializerForLocalDateTime(inputObject)
case c if c == classOf[java.time.LocalDate] =>
createSerializerForJavaLocalDate(inputObject)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index fced82c97b4..e2b624f8e13 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils
/**
@@ -752,7 +753,8 @@ object ScalaReflection extends ScalaReflection {
Schema(TimestampType, nullable = true)
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
Schema(TimestampType, nullable = true)
- case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) =>
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with
minimal code changes.
+ case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) &&
Utils.isTesting =>
Schema(TimestampNTZType, nullable = true)
case t if isSubtype(t, localTypeOf[java.time.LocalDate]) =>
Schema(DateType, nullable = true)
case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType,
nullable = true)
@@ -858,7 +860,9 @@ object ScalaReflection extends ScalaReflection {
StringType -> classOf[UTF8String],
DateType -> classOf[DateType.InternalType],
TimestampType -> classOf[TimestampType.InternalType],
- TimestampNTZType -> classOf[TimestampNTZType.InternalType],
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ TimestampNTZType ->
+ (if (Utils.isTesting) classOf[TimestampNTZType.InternalType] else
classOf[java.lang.Object]),
BinaryType -> classOf[BinaryType.InternalType],
CalendarIntervalType -> classOf[CalendarInterval]
)
@@ -873,7 +877,9 @@ object ScalaReflection extends ScalaReflection {
DoubleType -> classOf[java.lang.Double],
DateType -> classOf[java.lang.Integer],
TimestampType -> classOf[java.lang.Long],
- TimestampNTZType -> classOf[java.lang.Long]
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ TimestampNTZType ->
+ (if (Utils.isTesting) classOf[java.lang.Long] else
classOf[java.lang.Object])
)
def dataTypeJavaClass(dt: DataType): Class[_] = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c05b9326d23..3b8a73717af 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.util.Utils
/**
* Throws user facing errors when passed invalid queries that fail to analyze.
@@ -157,6 +158,10 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
case _: ShowTableExtended =>
throw QueryCompilationErrors.commandUnsupportedInV2TableError("SHOW
TABLE EXTENDED")
+ case operator: LogicalPlan
+ if !Utils.isTesting &&
operator.output.exists(_.dataType.isInstanceOf[TimestampNTZType]) =>
+ operator.failAnalysis("TimestampNTZ type is not supported in Spark
3.3.")
+
case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the
type check fails,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 5befa779d16..f6bd9891681 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -34,6 +34,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
@@ -358,6 +359,26 @@ object FunctionRegistry {
// then create a `RuntimeReplaceable` expression to call the Java method
with `Invoke` or
// `StaticInvoke` expression. By doing so we don't need to implement codegen
for new functions
// anymore. See `AesEncrypt`/`AesDecrypt` as an example.
+ val expressionsForTimestampNTZSupport: Map[String, (ExpressionInfo,
FunctionBuilder)] =
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ if (Utils.isTesting) {
+ Map(
+ expression[LocalTimestamp]("localtimestamp"),
+ expression[ConvertTimezone]("convert_timezone"),
+ // We keep the 2 expression builders below to have different function
docs.
+ expressionBuilder(
+ "to_timestamp_ntz", ParseToTimestampNTZExpressionBuilder, setAlias =
true),
+ expressionBuilder(
+ "to_timestamp_ltz", ParseToTimestampLTZExpressionBuilder, setAlias =
true),
+ // We keep the 2 expression builders below to have different function
docs.
+ expressionBuilder("make_timestamp_ntz",
MakeTimestampNTZExpressionBuilder, setAlias = true),
+ expressionBuilder("make_timestamp_ltz",
MakeTimestampLTZExpressionBuilder, setAlias = true)
+ )
+ } else {
+ Map.empty
+ }
+
+ // Note: Whenever we add a new entry here, make sure we also update
ExpressionToSQLSuite
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
// misc non-aggregate functions
expression[Abs]("abs"),
@@ -580,7 +601,6 @@ object FunctionRegistry {
expression[CurrentDate]("current_date"),
expression[CurrentTimestamp]("current_timestamp"),
expression[CurrentTimeZone]("current_timezone"),
- expression[LocalTimestamp]("localtimestamp"),
expression[DateDiff]("datediff"),
expression[DateAdd]("date_add"),
expression[DateFormatClass]("date_format"),
@@ -604,9 +624,6 @@ object FunctionRegistry {
expression[ToBinary]("to_binary"),
expression[ToUnixTimestamp]("to_unix_timestamp"),
expression[ToUTCTimestamp]("to_utc_timestamp"),
- // We keep the 2 expression builders below to have different function docs.
- expressionBuilder("to_timestamp_ntz",
ParseToTimestampNTZExpressionBuilder, setAlias = true),
- expressionBuilder("to_timestamp_ltz",
ParseToTimestampLTZExpressionBuilder, setAlias = true),
expression[TruncDate]("trunc"),
expression[TruncTimestamp]("date_trunc"),
expression[UnixTimestamp]("unix_timestamp"),
@@ -618,9 +635,6 @@ object FunctionRegistry {
expression[SessionWindow]("session_window"),
expression[MakeDate]("make_date"),
expression[MakeTimestamp]("make_timestamp"),
- // We keep the 2 expression builders below to have different function docs.
- expressionBuilder("make_timestamp_ntz", MakeTimestampNTZExpressionBuilder,
setAlias = true),
- expressionBuilder("make_timestamp_ltz", MakeTimestampLTZExpressionBuilder,
setAlias = true),
expression[MakeInterval]("make_interval"),
expression[MakeDTInterval]("make_dt_interval"),
expression[MakeYMInterval]("make_ym_interval"),
@@ -635,7 +649,6 @@ object FunctionRegistry {
expression[UnixSeconds]("unix_seconds"),
expression[UnixMillis]("unix_millis"),
expression[UnixMicros]("unix_micros"),
- expression[ConvertTimezone]("convert_timezone"),
// collection functions
expression[CreateArray]("array"),
@@ -782,7 +795,7 @@ object FunctionRegistry {
expression[CsvToStructs]("from_csv"),
expression[SchemaOfCsv]("schema_of_csv"),
expression[StructsToCsv]("to_csv")
- )
+ ) ++ expressionsForTimestampNTZSupport
val builtin: SimpleFunctionRegistry = {
val fr = new SimpleFunctionRegistry
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 62b3ee74407..d47e34b110d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -315,10 +315,6 @@ package object dsl {
/** Creates a new AttributeReference of type timestamp */
def timestamp: AttributeReference = AttributeReference(s, TimestampType,
nullable = true)()
- /** Creates a new AttributeReference of type timestamp without time zone
*/
- def timestampNTZ: AttributeReference =
- AttributeReference(s, TimestampNTZType, nullable = true)()
-
/** Creates a new AttributeReference of the day-time interval type */
def dayTimeInterval(startField: Byte, endField: Byte):
AttributeReference = {
AttributeReference(s, DayTimeIntervalType(startField, endField),
nullable = true)()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index d7e497fafa8..7b6865d0af4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData,
ArrayData}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* A factory for constructing encoders that convert external row to/from the
Spark SQL
@@ -112,7 +113,8 @@ object RowEncoder {
createSerializerForSqlTimestamp(inputObject)
}
- case TimestampNTZType => createSerializerForLocalDateTime(inputObject)
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ case TimestampNTZType if Utils.isTesting =>
createSerializerForLocalDateTime(inputObject)
case DateType =>
if (lenient) {
@@ -243,7 +245,8 @@ object RowEncoder {
} else {
ObjectType(classOf[java.sql.Timestamp])
}
- case TimestampNTZType =>
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ case TimestampNTZType if Utils.isTesting =>
ObjectType(classOf[java.time.LocalDateTime])
case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
@@ -301,7 +304,8 @@ object RowEncoder {
createDeserializerForSqlTimestamp(input)
}
- case TimestampNTZType =>
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ case TimestampNTZType if Utils.isTesting =>
createDeserializerForLocalDateTime(input)
case DateType =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index fc701d4f817..ff3d898942c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -219,7 +219,7 @@ case class Now() extends CurrentTimestampLike {
2020-04-25 15:49:11.914
""",
group = "datetime_funcs",
- since = "3.3.0")
+ since = "3.4.0")
case class LocalTimestamp(timeZoneId: Option[String] = None) extends
LeafExpression
with TimeZoneAwareExpression with CodegenFallback {
@@ -1111,7 +1111,7 @@ case class GetTimestamp(
2016-12-31 00:00:00
""",
group = "datetime_funcs",
- since = "3.3.0")
+ since = "3.4.0")
// scalastyle:on line.size.limit
object ParseToTimestampNTZExpressionBuilder extends ExpressionBuilder {
override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
@@ -1148,7 +1148,7 @@ object ParseToTimestampNTZExpressionBuilder extends
ExpressionBuilder {
2016-12-31 00:00:00
""",
group = "datetime_funcs",
- since = "3.3.0")
+ since = "3.4.0")
// scalastyle:on line.size.limit
object ParseToTimestampLTZExpressionBuilder extends ExpressionBuilder {
override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
@@ -2440,7 +2440,7 @@ case class MakeDate(
NULL
""",
group = "datetime_funcs",
- since = "3.3.0")
+ since = "3.4.0")
// scalastyle:on line.size.limit
object MakeTimestampNTZExpressionBuilder extends ExpressionBuilder {
override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
@@ -2487,7 +2487,7 @@ object MakeTimestampNTZExpressionBuilder extends
ExpressionBuilder {
NULL
""",
group = "datetime_funcs",
- since = "3.3.0")
+ since = "3.4.0")
// scalastyle:on line.size.limit
object MakeTimestampLTZExpressionBuilder extends ExpressionBuilder {
override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
@@ -3015,7 +3015,7 @@ object SubtractDates {
2021-12-06 00:00:00
""",
group = "datetime_funcs",
- since = "3.3.0")
+ since = "3.4.0")
// scalastyle:on line.size.limit
case class ConvertTimezone(
sourceTz: Expression,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index af10a18e4d1..6262bdef7f7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -80,7 +80,9 @@ object Literal {
case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale),
d.scale))
case i: Instant => Literal(instantToMicros(i), TimestampType)
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t),
TimestampType)
- case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l),
TimestampNTZType)
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ case l: LocalDateTime if Utils.isTesting =>
+ Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType)
case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType())
@@ -120,7 +122,8 @@ object Literal {
case _ if clz == classOf[Date] => DateType
case _ if clz == classOf[Instant] => TimestampType
case _ if clz == classOf[Timestamp] => TimestampType
- case _ if clz == classOf[LocalDateTime] => TimestampNTZType
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ case _ if clz == classOf[LocalDateTime] && Utils.isTesting =>
TimestampNTZType
case _ if clz == classOf[Duration] => DayTimeIntervalType()
case _ if clz == classOf[Period] => YearMonthIntervalType()
case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT
@@ -186,7 +189,8 @@ object Literal {
case dt: DecimalType => Literal(Decimal(0, dt.precision, dt.scale))
case DateType => create(0, DateType)
case TimestampType => create(0L, TimestampType)
- case TimestampNTZType => create(0L, TimestampNTZType)
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ case TimestampNTZType if Utils.isTesting => create(0L, TimestampNTZType)
case it: DayTimeIntervalType => create(0L, it)
case it: YearMonthIntervalType => create(0, it)
case StringType => Literal("")
@@ -208,7 +212,9 @@ object Literal {
case ByteType => v.isInstanceOf[Byte]
case ShortType => v.isInstanceOf[Short]
case IntegerType | DateType | _: YearMonthIntervalType =>
v.isInstanceOf[Int]
- case LongType | TimestampType | TimestampNTZType | _:
DayTimeIntervalType =>
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with
minimal code changes.
+ case TimestampNTZType if Utils.isTesting => v.isInstanceOf[Long]
+ case LongType | TimestampType | _: DayTimeIntervalType =>
v.isInstanceOf[Long]
case FloatType => v.isInstanceOf[Float]
case DoubleType => v.isInstanceOf[Double]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3a22c5ee745..d334b5780f7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -48,6 +48,7 @@ import org.apache.spark.sql.errors.QueryParsingErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils.isTesting
import org.apache.spark.util.random.RandomSampler
/**
@@ -2203,11 +2204,11 @@ class AstBuilder extends
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val zoneId = getZoneId(conf.sessionLocalTimeZone)
val specialDate = convertSpecialDate(value, zoneId).map(Literal(_,
DateType))
specialDate.getOrElse(toLiteral(stringToDate, DateType))
- case "TIMESTAMP_NTZ" =>
+ case "TIMESTAMP_NTZ" if isTesting =>
convertSpecialTimestampNTZ(value,
getZoneId(conf.sessionLocalTimeZone))
.map(Literal(_, TimestampNTZType))
.getOrElse(toLiteral(stringToTimestampWithoutTimeZone,
TimestampNTZType))
- case "TIMESTAMP_LTZ" =>
+ case "TIMESTAMP_LTZ" if isTesting =>
constructTimestampLTZLiteral(value)
case "TIMESTAMP" =>
SQLConf.get.timestampType match {
@@ -2658,8 +2659,9 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef]
with SQLConfHelper wit
case ("double", Nil) => DoubleType
case ("date", Nil) => DateType
case ("timestamp", Nil) => SQLConf.get.timestampType
- case ("timestamp_ntz", Nil) => TimestampNTZType
- case ("timestamp_ltz", Nil) => TimestampType
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with
minimal code changes.
+ case ("timestamp_ntz", Nil) if isTesting => TimestampNTZType
+ case ("timestamp_ltz", Nil) if isTesting => TimestampType
case ("string", Nil) => StringType
case ("character" | "char", length :: Nil) =>
CharType(length.getText.toInt)
case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9b7d4aee745..c4ffc844135 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3215,9 +3215,10 @@ object SQLConf {
s"and type literal. Setting the configuration as
${TimestampTypes.TIMESTAMP_NTZ} will " +
"use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it
as " +
s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME
ZONE. " +
- "Before the 3.3.0 release, Spark only supports the TIMESTAMP WITH " +
+ "Before the 3.4.0 release, Spark only supports the TIMESTAMP WITH " +
"LOCAL TIME ZONE type.")
- .version("3.3.0")
+ .version("3.4.0")
+ .internal()
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(TimestampTypes.values.map(_.toString))
@@ -4359,12 +4360,14 @@ class SQLConf extends Serializable with Logging {
def strictIndexOperator: Boolean = ansiEnabled &&
getConf(ANSI_STRICT_INDEX_OPERATOR)
def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
- case "TIMESTAMP_LTZ" =>
+ // SPARK-38813: Remove TimestampNTZ type support in Spark 3.3 with minimal
code changes.
+ // The configuration `TIMESTAMP_TYPE` is only effective for
testing in Spark 3.3.
+ case "TIMESTAMP_NTZ" if Utils.isTesting =>
+ TimestampNTZType
+
+ case _ =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL
TIME ZONE
TimestampType
-
- case "TIMESTAMP_NTZ" =>
- TimestampNTZType
}
def nestedSchemaPruningEnabled: Boolean =
getConf(NESTED_SCHEMA_PRUNING_ENABLED)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala
index ef653100e81..d2a1f2c34c1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.types
import scala.math.Ordering
import scala.reflect.runtime.universe.typeTag
-import org.apache.spark.annotation.Unstable
-
/**
* The timestamp without time zone type represents a local time in microsecond
precision,
* which is independent of time zone.
@@ -29,10 +27,8 @@ import org.apache.spark.annotation.Unstable
* To represent an absolute point in time, use `TimestampType` instead.
*
* Please use the singleton `DataTypes.TimestampNTZType` to refer the type.
- * @since 3.3.0
*/
-@Unstable
-class TimestampNTZType private() extends DatetimeType {
+private[spark] class TimestampNTZType private() extends DatetimeType {
/**
* Internally, a timestamp is stored as the number of microseconds from
* the epoch of 1970-01-01T00:00:00.000000(Unix system time zero)
@@ -58,8 +54,5 @@ class TimestampNTZType private() extends DatetimeType {
* the TimestampNTZType class. Otherwise, the companion object would be of type
* "TimestampNTZType" in byte code. Defined with a private constructor so the
companion
* object is the only possible instantiation.
- *
- * @since 3.3.0
*/
-@Unstable
-case object TimestampNTZType extends TimestampNTZType
+private[spark] case object TimestampNTZType extends TimestampNTZType
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index cce53001ea6..53606f58dcf 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -140,7 +140,7 @@ public class ParquetVectorUpdaterFactory {
}
break;
case INT96:
- if (sparkType == DataTypes.TimestampNTZType) {
+ if (sparkType == TimestampNTZType$.MODULE$) {
convertErrorForTimestampNTZ(typeName.name());
} else if (sparkType == DataTypes.TimestampType) {
final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
@@ -197,14 +197,14 @@ public class ParquetVectorUpdaterFactory {
// Throw an exception if the Parquet type is TimestampLTZ and the Catalyst
type is TimestampNTZ.
// This is to avoid mistakes in reading the timestamp values.
if (((TimestampLogicalTypeAnnotation)
logicalTypeAnnotation).isAdjustedToUTC() &&
- sparkType == DataTypes.TimestampNTZType) {
+ sparkType == TimestampNTZType$.MODULE$) {
convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")");
}
}
void convertErrorForTimestampNTZ(String parquetType) {
throw new RuntimeException("Unable to create Parquet converter for data
type " +
- DataTypes.TimestampNTZType.json() + " whose Parquet type is " +
parquetType);
+ TimestampNTZType$.MODULE$.json() + " whose Parquet type is " +
parquetType);
}
boolean isUnsignedIntTypeMatched(int bitWidth) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index 044231f4b5a..90188cadfd3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -82,9 +82,6 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
/** @since 3.0.0 */
implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] =
Encoders.LOCALDATE
- /** @since 3.3.0 */
- implicit def newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] =
Encoders.LOCALDATETIME
-
/** @since 2.2.0 */
implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] =
Encoders.TIMESTAMP
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 22978fb8c28..87105a4d8a7 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -607,14 +607,6 @@ public class JavaDatasetSuite implements Serializable {
Assert.assertEquals(data, ds.collectAsList());
}
- @Test
- public void testLocalDateTimeEncoder() {
- Encoder<LocalDateTime> encoder = Encoders.LOCALDATETIME();
- List<LocalDateTime> data = Arrays.asList(LocalDateTime.of(1, 1, 1, 1, 1));
- Dataset<LocalDateTime> ds = spark.createDataset(data, encoder);
- Assert.assertEquals(data, ds.collectAsList());
- }
-
@Test
public void testDurationEncoder() {
Encoder<Duration> encoder = Encoders.DURATION();
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 461bbd8987c..b676c26023a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql
import java.text.SimpleDateFormat
-import java.time.{Duration, LocalDateTime, Period}
+import java.time.{Duration, Period}
import java.util.Locale
import scala.collection.JavaConverters._
@@ -354,21 +354,6 @@ class CsvFunctionsSuite extends QueryTest with
SharedSparkSession {
}
}
- test("SPARK-36490: Make from_csv/to_csv to handle timestamp_ntz type
properly") {
- val localDT = LocalDateTime.parse("2021-08-12T15:16:23")
- val df = Seq(localDT).toDF
- val toCsvDF = df.select(to_csv(struct($"value")) as "csv")
- checkAnswer(toCsvDF, Row("2021-08-12T15:16:23.000"))
- val fromCsvDF = toCsvDF
- .select(
- from_csv(
- $"csv",
- StructType(StructField("a", TimestampNTZType) :: Nil),
- Map.empty[String, String]) as "value")
- .selectExpr("value.a")
- checkAnswer(fromCsvDF, Row(localDT))
- }
-
test("SPARK-37326: Handle incorrectly formatted timestamp_ntz values in
from_csv") {
val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv")
.select(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 425be96d6b8..72e2d77bba6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import java.time.{Duration, LocalDateTime, Period}
+import java.time.{Duration, Period}
import scala.util.Random
@@ -1434,17 +1434,6 @@ class DataFrameAggregateSuite extends QueryTest
checkAnswer(df2, Row(Period.ofYears(1), 1))
}
- test("SPARK-36054: Support group by TimestampNTZ column") {
- val ts1 = "2021-01-01T00:00:00"
- val ts2 = "2021-01-01T00:00:01"
- val localDateTime = Seq(ts1, ts1, ts2).map(LocalDateTime.parse)
- val df = localDateTime.toDF("ts").groupBy("ts").count().orderBy("ts")
- val expectedSchema =
- new StructType().add(StructField("ts", TimestampNTZType)).add("count",
LongType, false)
- assert (df.schema == expectedSchema)
- checkAnswer(df, Seq(Row(LocalDateTime.parse(ts1), 2),
Row(LocalDateTime.parse(ts2), 1)))
- }
-
test("SPARK-36926: decimal average mistakenly overflow") {
val df = (1 to 10).map(_ => "9999999999.99").toDF("d")
val res = df.select($"d".cast("decimal(12,
2)").as("d")).agg(avg($"d").cast("string"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c846441e9e0..cbdf31a6eaf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2106,11 +2106,6 @@ class DatasetSuite extends QueryTest
checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) ::
Nil)
}
- test("SPARK-35664: implicit encoder for java.time.LocalDateTime") {
- val localDateTime =
java.time.LocalDateTime.parse("2021-06-08T12:31:58.999999")
- assert(Seq(localDateTime).toDS().head() === localDateTime)
- }
-
test("SPARK-34605: implicit encoder for java.time.Duration") {
val duration = java.time.Duration.ofMinutes(10)
assert(spark.range(1).map { _ => duration }.head === duration)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index e18c087a262..c86e1f6e297 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql
import java.text.SimpleDateFormat
-import java.time.{Duration, LocalDateTime, Period}
+import java.time.{Duration, Period}
import java.util.Locale
import collection.JavaConverters._
@@ -918,16 +918,4 @@ class JsonFunctionsSuite extends QueryTest with
SharedSparkSession {
}
}
}
-
- test("SPARK-36491: Make from_json/to_json to handle timestamp_ntz type
properly") {
- val localDT = LocalDateTime.parse("2021-08-12T15:16:23")
- val df = Seq(localDT).toDF
- val toJsonDF = df.select(to_json(map(lit("key"), $"value")) as "json")
- checkAnswer(toJsonDF, Row("""{"key":"2021-08-12T15:16:23.000"}"""))
- val fromJsonDF = toJsonDF
- .select(
- from_json($"json", StructType(StructField("key", TimestampNTZType) ::
Nil)) as "value")
- .selectExpr("value['key']")
- checkAnswer(fromJsonDF, Row(localDT))
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index e651459394f..912811bfda7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -848,34 +848,6 @@ class UDFSuite extends QueryTest with SharedSparkSession {
}
}
- test("SPARK-35674: using java.time.LocalDateTime in UDF") {
- // Regular case
- val input =
Seq(java.time.LocalDateTime.parse("2021-01-01T00:00:00")).toDF("dateTime")
- val plusYear = udf((l: java.time.LocalDateTime) => l.plusYears(1))
- val result = input.select(plusYear($"dateTime").as("newDateTime"))
- checkAnswer(result,
Row(java.time.LocalDateTime.parse("2022-01-01T00:00:00")) :: Nil)
- assert(result.schema === new StructType().add("newDateTime",
TimestampNTZType))
- // UDF produces `null`
- val nullFunc = udf((_: java.time.LocalDateTime) =>
null.asInstanceOf[java.time.LocalDateTime])
- val nullResult = input.select(nullFunc($"dateTime").as("nullDateTime"))
- checkAnswer(nullResult, Row(null) :: Nil)
- assert(nullResult.schema === new StructType().add("nullDateTime",
TimestampNTZType))
- // Input parameter of UDF is null
- val nullInput =
Seq(null.asInstanceOf[java.time.LocalDateTime]).toDF("nullDateTime")
- val constDuration = udf((_: java.time.LocalDateTime) =>
- java.time.LocalDateTime.parse("2021-01-01T00:00:00"))
- val constResult =
nullInput.select(constDuration($"nullDateTime").as("firstDayOf2021"))
- checkAnswer(constResult,
Row(java.time.LocalDateTime.parse("2021-01-01T00:00:00")) :: Nil)
- assert(constResult.schema === new StructType().add("firstDayOf2021",
TimestampNTZType))
- // Error in the conversion of UDF result to the internal representation of
timestamp without
- // time zone
- val overflowFunc = udf((l: java.time.LocalDateTime) =>
l.plusDays(Long.MaxValue))
- val e = intercept[SparkException] {
- input.select(overflowFunc($"dateTime")).collect()
- }.getCause.getCause
- assert(e.isInstanceOf[java.lang.ArithmeticException])
- }
-
test("SPARK-34663, SPARK-35730: using java.time.Duration in UDF") {
// Regular case
val input = Seq(java.time.Duration.ofHours(23)).toDF("d")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]