This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3000c000d14 [SPARK-41127][CONNECT][PYTHON] Implement 
DataFrame.CreateGlobalView in Python client
3000c000d14 is described below

commit 3000c000d14c7e2c0cb3853550ba1d037d351a32
Author: Rui Wang <[email protected]>
AuthorDate: Tue Nov 15 16:28:59 2022 +0900

    [SPARK-41127][CONNECT][PYTHON] Implement DataFrame.CreateGlobalView in 
Python client
    
    ### What changes were proposed in this pull request?
    
    This PR adds `CreateGlobalTempView` and `CreateOrReplaceGlobalTempView` to 
Python DataFrame API.
    
    Meanwhile, this PR extends `LogicalPlan` to let it have the ability to deal 
with `Command`.
    
    ### Why are the changes needed?
    
    Improve API coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #38642 from amaliujia/create_temp_view_in_python.
    
    Authored-by: Rui Wang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/client.py               |  7 ++++
 python/pyspark/sql/connect/dataframe.py            | 34 ++++++++++++++++
 python/pyspark/sql/connect/plan.py                 | 45 ++++++++++++++++++++++
 .../sql/tests/connect/test_connect_basic.py        | 17 +++++++-
 4 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/connect/client.py 
b/python/pyspark/sql/connect/client.py
index c2d808bb6ee..62949720134 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -400,6 +400,13 @@ class RemoteSparkSession(object):
     def explain_string(self, plan: pb2.Plan) -> str:
         return self._analyze(plan).explain_string
 
+    def execute_command(self, command: pb2.Command) -> None:
+        req = pb2.Request()
+        if self._user_id:
+            req.user_context.user_id = self._user_id
+        req.plan.command.CopyFrom(command)
+        self._execute_and_fetch(req)
+
     def _analyze(self, plan: pb2.Plan) -> AnalyzeResult:
         req = pb2.Request()
         if self._user_id:
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 4e9c6f919a8..0dccd9ffb97 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -676,6 +676,40 @@ class DataFrame(object):
         else:
             return ""
 
+    def createGlobalTempView(self, name: str) -> None:
+        """Creates a global temporary view with this :class:`DataFrame`.
+
+        The lifetime of this temporary view is tied to this Spark application.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        name : str
+            Name of the view.
+        """
+        command = plan.CreateView(
+            child=self._plan, name=name, is_global=True, replace=False
+        ).command(session=self._session)
+        self._session.execute_command(command)
+
+    def createOrReplaceGlobalTempView(self, name: str) -> None:
+        """Creates or replaces a global temporary view using the given name.
+
+        The lifetime of this temporary view is tied to this Spark application.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        name : str
+            Name of the view.
+        """
+        command = plan.CreateView(
+            child=self._plan, name=name, is_global=True, replace=True
+        ).command(session=self._session)
+        self._session.execute_command(command)
+
 
 class DataFrameStatFunctions:
     """Functionality for statistic functions with :class:`DataFrame`.
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index aa80730c651..c457056d5a4 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -69,6 +69,9 @@ class LogicalPlan(object):
     def plan(self, session: "RemoteSparkSession") -> proto.Relation:
         ...
 
+    def command(self, session: "RemoteSparkSession") -> proto.Command:
+        ...
+
     def _verify(self, session: "RemoteSparkSession") -> bool:
         """This method is used to verify that the current logical plan
         can be serialized to Proto and back and afterwards is identical."""
@@ -902,3 +905,45 @@ class StatCrosstab(LogicalPlan):
            </li>
         </ul>
         """
+
+
+class CreateView(LogicalPlan):
+    def __init__(
+        self, child: Optional["LogicalPlan"], name: str, is_global: bool, 
replace: bool
+    ) -> None:
+        super().__init__(child)
+        self._name = name
+        self._is_gloal = is_global
+        self._replace = replace
+
+    def command(self, session: "RemoteSparkSession") -> proto.Command:
+        assert self._child is not None
+
+        plan = proto.Command()
+        plan.create_dataframe_view.replace = self._replace
+        plan.create_dataframe_view.is_global = self._is_gloal
+        plan.create_dataframe_view.name = self._name
+        plan.create_dataframe_view.input.CopyFrom(self._child.plan(session))
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<CreateView name='{self._name}' "
+            f"is_global='{self._is_gloal} "
+            f"replace='{self._replace}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>CreateView</b><br />
+              name: {self._name} <br />
+              is_global: {self._is_gloal} <br />
+              replace: {self._replace} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 1314699887c..bb020db3c79 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -20,8 +20,9 @@ import shutil
 import tempfile
 
 import grpc  # type: ignore
+from grpc._channel import _MultiThreadedRendezvous  # type: ignore
 
-from pyspark.testing.sqlutils import have_pandas
+from pyspark.testing.sqlutils import have_pandas, SQLTestUtils
 
 if have_pandas:
     import pandas
@@ -39,7 +40,7 @@ from pyspark.testing.utils import ReusedPySparkTestCase
 
 
 @unittest.skipIf(not should_test_connect, connect_requirement_message)
-class SparkConnectSQLTestCase(ReusedPySparkTestCase):
+class SparkConnectSQLTestCase(ReusedPySparkTestCase, SQLTestUtils):
     """Parent test fixture class for all Spark Connect related
     test cases."""
 
@@ -207,6 +208,18 @@ class SparkConnectTests(SparkConnectSQLTestCase):
             .equals(self.spark.range(start=0, end=10, step=3, 
numPartitions=2).toPandas())
         )
 
+    def test_create_global_temp_view(self):
+        # SPARK-41127: test global temp view creation.
+        with self.tempView("view_1"):
+            self.connect.sql("SELECT 1 AS X LIMIT 
0").createGlobalTempView("view_1")
+            self.connect.sql("SELECT 2 AS X LIMIT 
1").createOrReplaceGlobalTempView("view_1")
+            
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))
+
+            # Test when creating a view which is alreayd exists but
+            
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))
+            with self.assertRaises(_MultiThreadedRendezvous):
+                self.connect.sql("SELECT 1 AS X LIMIT 
0").createGlobalTempView("view_1")
+
     def test_empty_dataset(self):
         # SPARK-41005: Test arrow based collection with empty dataset.
         self.assertTrue(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to