This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b0450d0  [SPARK-26902][SQL] Support java.time.Instant as an external 
type of TimestampType
b0450d0 is described below

commit b0450d07bd5a77a519a662351ca5b5d562e61a58
Author: Maxim Gekk <maxim.g...@databricks.com>
AuthorDate: Wed Feb 27 21:05:19 2019 +0800

    [SPARK-26902][SQL] Support java.time.Instant as an external type of 
TimestampType
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to add new Catalyst type converter for 
`TimestampType`. It should be able to convert `java.time.Instant` to/from 
`TimestampType`.
    
    Main motivations for the changes:
    - Smoothly support Java 8 time API
    - Avoid inconsistency of calendars used inside of Spark 3.0 (Proleptic 
Gregorian calendar) and `java.sql.Timestamp` (hybrid calendar - Julian + 
Gregorian).
    - Make conversion independent from current system timezone.
    
    By default, Spark converts values of `TimestampType` to 
`java.sql.Timestamp` instances but the SQL config 
`spark.sql.catalyst.timestampType` can change the behavior. It accepts two 
values `Timestamp` (default) and `Instant`. If the former one is set, Spark 
returns `java.time.Instant` instances for timestamp values.
    
    ## How was this patch tested?
    
    Added new testes to `CatalystTypeConvertersSuite` to check conversion of 
`TimestampType` to/from `java.time.Instant`.
    
    Closes #23811 from MaxGekk/timestamp-instant.
    
    Lead-authored-by: Maxim Gekk <maxim.g...@databricks.com>
    Co-authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/CatalystTypeConverters.scala      | 14 ++++++++
 .../sql/catalyst/DeserializerBuildHelper.scala     |  9 +++++
 .../spark/sql/catalyst/JavaTypeInference.scala     |  3 ++
 .../spark/sql/catalyst/ScalaReflection.scala       | 12 +++++++
 .../spark/sql/catalyst/encoders/RowEncoder.scala   | 22 ++++++++++++-
 .../spark/sql/catalyst/util/DateTimeUtils.scala    |  6 ++++
 .../sql/catalyst/util/TimestampFormatter.scala     |  5 +--
 .../org/apache/spark/sql/internal/SQLConf.scala    |  8 +++++
 .../sql/catalyst/CatalystTypeConvertersSuite.scala | 38 ++++++++++++++++++++--
 .../sql/catalyst/encoders/RowEncoderSuite.scala    | 15 ++++++++-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala | 11 +++++++
 11 files changed, 135 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 93df73a..a20625b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -21,6 +21,7 @@ import java.lang.{Iterable => JavaIterable}
 import java.math.{BigDecimal => JavaBigDecimal}
 import java.math.{BigInteger => JavaBigInteger}
 import java.sql.{Date, Timestamp}
+import java.time.Instant
 import java.util.{Map => JavaMap}
 import javax.annotation.Nullable
 
@@ -29,6 +30,7 @@ import scala.language.existentials
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -61,6 +63,7 @@ object CatalystTypeConverters {
       case structType: StructType => StructConverter(structType)
       case StringType => StringConverter
       case DateType => DateConverter
+      case TimestampType if SQLConf.get.timestampExternalType == "Instant" => 
InstantConverter
       case TimestampType => TimestampConverter
       case dt: DecimalType => new DecimalConverter(dt)
       case BooleanType => BooleanConverter
@@ -315,6 +318,16 @@ object CatalystTypeConverters {
       DateTimeUtils.toJavaTimestamp(row.getLong(column))
   }
 
+  private object InstantConverter extends CatalystTypeConverter[Instant, 
Instant, Any] {
+    override def toCatalystImpl(scalaValue: Instant): Long =
+      DateTimeUtils.instantToMicros(scalaValue)
+    override def toScala(catalystValue: Any): Instant =
+      if (catalystValue == null) null
+      else DateTimeUtils.microsToInstant(catalystValue.asInstanceOf[Long])
+    override def toScalaImpl(row: InternalRow, column: Int): Instant =
+      DateTimeUtils.microsToInstant(row.getLong(column))
+  }
+
   private class DecimalConverter(dataType: DecimalType)
     extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
     override def toCatalystImpl(scalaValue: Any): Decimal = {
@@ -421,6 +434,7 @@ object CatalystTypeConverters {
     case s: String => StringConverter.toCatalyst(s)
     case d: Date => DateConverter.toCatalyst(d)
     case t: Timestamp => TimestampConverter.toCatalyst(t)
+    case i: Instant => InstantConverter.toCatalyst(i)
     case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, 
d.scale)).toCatalyst(d)
     case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, 
d.scale)).toCatalyst(d)
     case seq: Seq[Any] => new 
GenericArrayData(seq.map(convertToCatalyst).toArray)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index e71955a..3a2f386 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -101,6 +101,15 @@ object DeserializerBuildHelper {
       returnNullable = false)
   }
 
+  def createDeserializerForInstant(path: Expression): Expression = {
+    StaticInvoke(
+      DateTimeUtils.getClass,
+      ObjectType(classOf[java.time.Instant]),
+      "microsToInstant",
+      path :: Nil,
+      returnNullable = false)
+  }
+
   def createDeserializerForSqlTimestamp(path: Expression): Expression = {
     StaticInvoke(
       DateTimeUtils.getClass,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index dafa878..1822f9b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -224,6 +224,9 @@ object JavaTypeInference {
       case c if c == classOf[java.sql.Date] =>
         createDeserializerForSqlDate(path)
 
+      case c if c == classOf[java.time.Instant] =>
+        createDeserializerForInstant(path)
+
       case c if c == classOf[java.sql.Timestamp] =>
         createDeserializerForSqlTimestamp(path)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 741cba8..26cc7b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -197,6 +197,9 @@ object ScalaReflection extends ScalaReflection {
       case t if t <:< localTypeOf[java.sql.Date] =>
         createDeserializerForSqlDate(path)
 
+      case t if t <:< localTypeOf[java.time.Instant] =>
+        createDeserializerForInstant(path)
+
       case t if t <:< localTypeOf[java.sql.Timestamp] =>
         createDeserializerForSqlTimestamp(path)
 
@@ -474,6 +477,14 @@ object ScalaReflection extends ScalaReflection {
           inputObject :: Nil,
           returnNullable = false)
 
+      case t if t <:< localTypeOf[java.time.Instant] =>
+        StaticInvoke(
+          DateTimeUtils.getClass,
+          TimestampType,
+          "instantToMicros",
+          inputObject :: Nil,
+          returnNullable = false)
+
       case t if t <:< localTypeOf[java.sql.Timestamp] =>
         StaticInvoke(
           DateTimeUtils.getClass,
@@ -691,6 +702,7 @@ object ScalaReflection extends ScalaReflection {
         val Schema(dataType, nullable) = schemaFor(elementType)
         Schema(ArrayType(dataType, containsNull = nullable), nullable = true)
       case t if t <:< localTypeOf[String] => Schema(StringType, nullable = 
true)
+      case t if t <:< localTypeOf[java.time.Instant] => Schema(TimestampType, 
nullable = true)
       case t if t <:< localTypeOf[java.sql.Timestamp] => Schema(TimestampType, 
nullable = true)
       case t if t <:< localTypeOf[java.sql.Date] => Schema(DateType, nullable 
= true)
       case t if t <:< localTypeOf[BigDecimal] => 
Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 8ca3d35..eba6881 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.objects._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -46,7 +47,8 @@ import org.apache.spark.unsafe.types.UTF8String
  *   DecimalType -> java.math.BigDecimal or scala.math.BigDecimal or Decimal
  *
  *   DateType -> java.sql.Date
- *   TimestampType -> java.sql.Timestamp
+ *   TimestampType -> java.sql.Timestamp when spark.sql.catalyst.timestampType 
is set to Timestamp
+ *   TimestampType -> java.time.Instant when spark.sql.catalyst.timestampType 
is set to Instant
  *
  *   BinaryType -> byte array
  *   ArrayType -> scala.collection.Seq or Array
@@ -89,6 +91,14 @@ object RowEncoder {
         dataType = ObjectType(udtClass), false)
       Invoke(obj, "serialize", udt, inputObject :: Nil, returnNullable = false)
 
+    case TimestampType if SQLConf.get.timestampExternalType == "Instant" =>
+      StaticInvoke(
+        DateTimeUtils.getClass,
+        TimestampType,
+        "instantToMicros",
+        inputObject :: Nil,
+        returnNullable = false)
+
     case TimestampType =>
       StaticInvoke(
         DateTimeUtils.getClass,
@@ -224,6 +234,8 @@ object RowEncoder {
 
   def externalDataTypeFor(dt: DataType): DataType = dt match {
     case _ if ScalaReflection.isNativeType(dt) => dt
+    case TimestampType if SQLConf.get.timestampExternalType == "Instant" =>
+      ObjectType(classOf[java.time.Instant])
     case TimestampType => ObjectType(classOf[java.sql.Timestamp])
     case DateType => ObjectType(classOf[java.sql.Date])
     case _: DecimalType => ObjectType(classOf[java.math.BigDecimal])
@@ -267,6 +279,14 @@ object RowEncoder {
         dataType = ObjectType(udtClass))
       Invoke(obj, "deserialize", ObjectType(udt.userClass), input :: Nil)
 
+    case TimestampType if SQLConf.get.timestampExternalType == "Instant" =>
+      StaticInvoke(
+        DateTimeUtils.getClass,
+        ObjectType(classOf[java.time.Instant]),
+        "microsToInstant",
+        input :: Nil,
+        returnNullable = false)
+
     case TimestampType =>
       StaticInvoke(
         DateTimeUtils.getClass,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 627670a..742cee6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -355,6 +355,12 @@ object DateTimeUtils {
     result
   }
 
+  def microsToInstant(us: Long): Instant = {
+    val secs = Math.floorDiv(us, MICROS_PER_SECOND)
+    val mos = Math.floorMod(us, MICROS_PER_SECOND)
+    Instant.ofEpochSecond(secs, mos * NANOS_PER_MICROS)
+  }
+
   def instantToDays(instant: Instant): Int = {
     val seconds = instant.getEpochSecond
     val days = Math.floorDiv(seconds, SECONDS_PER_DAY)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 4ec61e1..c254815 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -61,10 +61,7 @@ class Iso8601TimestampFormatter(
   override def parse(s: String): Long = instantToMicros(toInstant(s))
 
   override def format(us: Long): String = {
-    val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND)
-    val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND)
-    val instant = Instant.ofEpochSecond(secs, mos * 
DateTimeUtils.NANOS_PER_MICROS)
-
+    val instant = DateTimeUtils.microsToInstant(us)
     formatter.withZone(timeZone.toZoneId).format(instant)
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e74c2af..2cf471e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1682,6 +1682,12 @@ object SQLConf {
         "a SparkConf entry.")
       .booleanConf
       .createWithDefault(true)
+
+  val TIMESTAMP_EXTERNAL_TYPE = buildConf("spark.sql.catalyst.timestampType")
+    .doc("Java class to/from which an instance of TimestampType is converted.")
+    .stringConf
+    .checkValues(Set("Timestamp", "Instant"))
+    .createWithDefault("Timestamp")
 }
 
 /**
@@ -1869,6 +1875,8 @@ class SQLConf extends Serializable with Logging {
 
   def fastHashAggregateRowMaxCapacityBit: Int = 
getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT)
 
+  def timestampExternalType: String = getConf(TIMESTAMP_EXTERNAL_TYPE)
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used 
to determine if two
    * identifiers are equal.
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index 89452ee..828e8d8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -17,14 +17,18 @@
 
 package org.apache.spark.sql.catalyst
 
+import java.time.Instant
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
-import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-class CatalystTypeConvertersSuite extends SparkFunSuite {
+class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper {
 
   private val simpleTypes: Seq[DataType] = Seq(
     StringType,
@@ -147,4 +151,34 @@ class CatalystTypeConvertersSuite extends SparkFunSuite {
     val expected = UTF8String.fromString("X")
     assert(converter(chr) === expected)
   }
+
+  test("converting java.time.Instant to TimestampType") {
+    Seq(
+      "0101-02-16T10:11:32Z",
+      "1582-10-02T01:02:03.04Z",
+      "1582-12-31T23:59:59.999999Z",
+      "1970-01-01T00:00:01.123Z",
+      "1972-12-31T23:59:59.123456Z",
+      "2019-02-16T18:12:30Z",
+      "2119-03-16T19:13:31Z").foreach { timestamp =>
+      val input = Instant.parse(timestamp)
+      val result = CatalystTypeConverters.convertToCatalyst(input)
+      val expected = DateTimeUtils.instantToMicros(input)
+      assert(result === expected)
+    }
+  }
+
+  test("converting TimestampType to java.time.Instant") {
+    withSQLConf(SQLConf.TIMESTAMP_EXTERNAL_TYPE.key -> "Instant") {
+      Seq(
+        -9463427405253013L,
+        -244000001L,
+        0L,
+        99628200102030L,
+        1543749753123456L).foreach { us =>
+        val instant = DateTimeUtils.microsToInstant(us)
+        
assert(CatalystTypeConverters.createToScalaConverter(TimestampType)(us) === 
instant)
+      }
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index ab819be..691056d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -21,7 +21,8 @@ import scala.util.Random
 
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest
-import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, 
GenericArrayData}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 @SQLUserDefinedType(udt = classOf[ExamplePointUDT])
@@ -281,6 +282,18 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
     assert(encoder.serializer(0).dataType == pythonUDT.sqlType)
   }
 
+  test("encoding/decoding TimestampType to/from java.time.Instant") {
+    withSQLConf(SQLConf.TIMESTAMP_EXTERNAL_TYPE.key -> "Instant") {
+      val schema = new StructType().add("t", TimestampType)
+      val encoder = RowEncoder(schema).resolveAndBind()
+      val instant = java.time.Instant.parse("2019-02-26T16:56:00Z")
+      val row = encoder.toRow(Row(instant))
+      assert(row.getLong(0) === DateTimeUtils.instantToMicros(instant))
+      val readback = encoder.fromRow(row)
+      assert(readback.get(0) === instant)
+    }
+  }
+
   for {
     elementType <- Seq(IntegerType, StringType)
     containsNull <- Seq(true, false)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index e515800..0f5f0ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -26,6 +26,7 @@ import 
org.apache.spark.sql.execution.columnar.InMemoryRelation
 import 
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, 
ExplainCommand}
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.functions.{lit, udf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
 import org.apache.spark.sql.types._
@@ -493,4 +494,14 @@ class UDFSuite extends QueryTest with SharedSQLContext {
       checkAnswer(df, Seq(Row(1L), Row(2L)))
     }
   }
+
+  test("Using java.time.Instant in UDF") {
+    withSQLConf(SQLConf.TIMESTAMP_EXTERNAL_TYPE.key -> "Instant") {
+      val expected = java.time.Instant.parse("2019-02-27T00:00:00Z")
+      val plusSec = udf((i: java.time.Instant) => i.plusSeconds(1))
+      val df = spark.sql("SELECT TIMESTAMP '2019-02-26 23:59:59Z' as t")
+        .select(plusSec('t))
+      assert(df.collect().toSeq === Seq(Row(expected)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to