This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 92fcf214c10 [SPARK-39155][PYTHON] Access to JVM through passed-in 
GatewayClient during type conversion
92fcf214c10 is described below

commit 92fcf214c107358c1a70566b644cec2d35c096c0
Author: Xinrong Meng <xinrong.m...@databricks.com>
AuthorDate: Thu May 12 12:22:11 2022 +0900

    [SPARK-39155][PYTHON] Access to JVM through passed-in GatewayClient during 
type conversion
    
    ### What changes were proposed in this pull request?
    Access to JVM through passed-in GatewayClient during type conversion.
    
    ### Why are the changes needed?
    In customized type converters, we may utilize the passed-in GatewayClient 
to access JVM, rather than rely on the `SparkContext._jvm`.
    
    That's 
[how](https://github.com/py4j/py4j/blob/master/py4j-python/src/py4j/java_collections.py#L508)
 Py4J explicit converters access JVM.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #36504 from xinrong-databricks/gateway_client_jvm.
    
    Authored-by: Xinrong Meng <xinrong.m...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/types.py | 30 ++++++++++++++----------------
 1 file changed, 14 insertions(+), 16 deletions(-)

diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 2a41508d634..123fd628980 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -44,7 +44,7 @@ from typing import (
 )
 
 from py4j.protocol import register_input_converter
-from py4j.java_gateway import JavaClass, JavaGateway, JavaObject
+from py4j.java_gateway import GatewayClient, JavaClass, JavaObject
 
 from pyspark.serializers import CloudPickleSerializer
 
@@ -1929,7 +1929,7 @@ class DateConverter:
     def can_convert(self, obj: Any) -> bool:
         return isinstance(obj, datetime.date)
 
-    def convert(self, obj: datetime.date, gateway_client: JavaGateway) -> 
JavaObject:
+    def convert(self, obj: datetime.date, gateway_client: GatewayClient) -> 
JavaObject:
         Date = JavaClass("java.sql.Date", gateway_client)
         return Date.valueOf(obj.strftime("%Y-%m-%d"))
 
@@ -1938,7 +1938,7 @@ class DatetimeConverter:
     def can_convert(self, obj: Any) -> bool:
         return isinstance(obj, datetime.datetime)
 
-    def convert(self, obj: datetime.datetime, gateway_client: JavaGateway) -> 
JavaObject:
+    def convert(self, obj: datetime.datetime, gateway_client: GatewayClient) 
-> JavaObject:
         Timestamp = JavaClass("java.sql.Timestamp", gateway_client)
         seconds = (
             calendar.timegm(obj.utctimetuple()) if obj.tzinfo else 
time.mktime(obj.timetuple())
@@ -1958,27 +1958,25 @@ class DatetimeNTZConverter:
             and is_timestamp_ntz_preferred()
         )
 
-    def convert(self, obj: datetime.datetime, gateway_client: JavaGateway) -> 
JavaObject:
-        from pyspark import SparkContext
-
+    def convert(self, obj: datetime.datetime, gateway_client: GatewayClient) 
-> JavaObject:
         seconds = calendar.timegm(obj.utctimetuple())
-        jvm = SparkContext._jvm
-        assert jvm is not None
-        return 
jvm.org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToLocalDateTime(
-            int(seconds) * 1000000 + obj.microsecond
+        DateTimeUtils = JavaClass(
+            "org.apache.spark.sql.catalyst.util.DateTimeUtils",
+            gateway_client,
         )
+        return DateTimeUtils.microsToLocalDateTime(int(seconds) * 1000000 + 
obj.microsecond)
 
 
 class DayTimeIntervalTypeConverter:
     def can_convert(self, obj: Any) -> bool:
         return isinstance(obj, datetime.timedelta)
 
-    def convert(self, obj: datetime.timedelta, gateway_client: JavaGateway) -> 
JavaObject:
-        from pyspark import SparkContext
-
-        jvm = SparkContext._jvm
-        assert jvm is not None
-        return 
jvm.org.apache.spark.sql.catalyst.util.IntervalUtils.microsToDuration(
+    def convert(self, obj: datetime.timedelta, gateway_client: GatewayClient) 
-> JavaObject:
+        IntervalUtils = JavaClass(
+            "org.apache.spark.sql.catalyst.util.IntervalUtils",
+            gateway_client,
+        )
+        return IntervalUtils.microsToDuration(
             (math.floor(obj.total_seconds()) * 1000000) + obj.microseconds
         )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to