Repository: spark
Updated Branches:
  refs/heads/master 6b68366df -> 424b0075a


[SPARK-6411] [SQL] [PySpark] support date/datetime with timezone in Python

Spark SQL does not support timezone, and Pyrolite does not support timezone 
well. This patch will convert datetime into POSIX timestamp (without confusing 
of timezone), which is used by SQL. If the datetime object does not have 
timezone, it's treated as local time.

The timezone in RDD will be lost after one round trip, all the datetime from 
SQL will be local time.

Because of Pyrolite, datetime from SQL only has precision as 1 millisecond.

This PR also drop the timezone in date, convert it to number of days since 
epoch (used in SQL).

Author: Davies Liu <[email protected]>

Closes #6250 from davies/tzone and squashes the following commits:

44d8497 [Davies Liu] add timezone support for DateType
99d9d9c [Davies Liu] use int for timestamp
10aa7ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into tzone
6a29aa4 [Davies Liu] support datetime with timezone


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/424b0075
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/424b0075
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/424b0075

Branch: refs/heads/master
Commit: 424b0075a1a31c251451c6a75c6ba8e81c39453d
Parents: 6b68366
Author: Davies Liu <[email protected]>
Authored: Thu Jun 11 01:00:41 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Thu Jun 11 01:00:41 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                     | 32 ++++++++++++++++++++
 python/pyspark/sql/types.py                     | 27 +++++++++++------
 .../apache/spark/sql/execution/pythonUdfs.scala |  3 +-
 3 files changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/424b0075/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a6fce50..b5fbb7d 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -26,6 +26,7 @@ import shutil
 import tempfile
 import pickle
 import functools
+import time
 import datetime
 
 import py4j
@@ -47,6 +48,20 @@ from pyspark.sql.functions import UserDefinedFunction
 from pyspark.sql.window import Window
 
 
+class UTC(datetime.tzinfo):
+    """UTC"""
+    ZERO = datetime.timedelta(0)
+
+    def utcoffset(self, dt):
+        return self.ZERO
+
+    def tzname(self, dt):
+        return "UTC"
+
+    def dst(self, dt):
+        return self.ZERO
+
+
 class ExamplePointUDT(UserDefinedType):
     """
     User-defined type (UDT) for ExamplePoint.
@@ -588,6 +603,23 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertEqual(0, df.filter(df.date > date).count())
         self.assertEqual(0, df.filter(df.time > time).count())
 
+    def test_time_with_timezone(self):
+        day = datetime.date.today()
+        now = datetime.datetime.now()
+        ts = time.mktime(now.timetuple()) + now.microsecond / 1e6
+        # class in __main__ is not serializable
+        from pyspark.sql.tests import UTC
+        utc = UTC()
+        utcnow = datetime.datetime.fromtimestamp(ts, utc)
+        df = self.sqlCtx.createDataFrame([(day, now, utcnow)])
+        day1, now1, utcnow1 = df.first()
+        # Pyrolite serialize java.sql.Date as datetime, will be fixed in new 
version
+        self.assertEqual(day1.date(), day)
+        # Pyrolite does not support microsecond, the error should be
+        # less than 1 millisecond
+        self.assertTrue(now - now1 < datetime.timedelta(0.001))
+        self.assertTrue(now - utcnow1 < datetime.timedelta(0.001))
+
     def test_dropna(self):
         schema = StructType([
             StructField("name", StringType(), True),

http://git-wip-us.apache.org/repos/asf/spark/blob/424b0075/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 8f286b6..23d9adb 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
             _need_python_to_sql_conversion(dataType.valueType)
     elif isinstance(dataType, UserDefinedType):
         return True
-    elif isinstance(dataType, TimestampType):
+    elif isinstance(dataType, (DateType, TimestampType)):
         return True
     else:
         return False
 
 
+EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
+
+
 def _python_to_sql_converter(dataType):
     """
     Returns a converter that converts a Python object into a SQL datum for the 
given type.
@@ -698,26 +701,32 @@ def _python_to_sql_converter(dataType):
                     return tuple(c(d.get(n)) for n, c in zip(names, 
converters))
                 else:
                     return tuple(c(v) for c, v in zip(converters, obj))
-            else:
+            elif obj is not None:
                 raise ValueError("Unexpected tuple %r with type %r" % (obj, 
dataType))
         return converter
     elif isinstance(dataType, ArrayType):
         element_converter = _python_to_sql_converter(dataType.elementType)
-        return lambda a: [element_converter(v) for v in a]
+        return lambda a: a and [element_converter(v) for v in a]
     elif isinstance(dataType, MapType):
         key_converter = _python_to_sql_converter(dataType.keyType)
         value_converter = _python_to_sql_converter(dataType.valueType)
-        return lambda m: dict([(key_converter(k), value_converter(v)) for k, v 
in m.items()])
+        return lambda m: m and dict([(key_converter(k), value_converter(v)) 
for k, v in m.items()])
+
     elif isinstance(dataType, UserDefinedType):
-        return lambda obj: dataType.serialize(obj)
+        return lambda obj: obj and dataType.serialize(obj)
+
+    elif isinstance(dataType, DateType):
+        return lambda d: d and d.toordinal() - EPOCH_ORDINAL
+
     elif isinstance(dataType, TimestampType):
 
         def to_posix_timstamp(dt):
-            if dt.tzinfo is None:
-                return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond 
* 10)
-            else:
-                return int(calendar.timegm(dt.utctimetuple()) * 1e7 + 
dt.microsecond * 10)
+            if dt:
+                seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+                           else time.mktime(dt.timetuple()))
+                return int(seconds * 1e7 + dt.microsecond * 10)
         return to_posix_timstamp
+
     else:
         raise ValueError("Unexpected type %r" % dataType)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/424b0075/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 955b478..b1333ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -28,8 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Row, _}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to