This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3000c000d14 [SPARK-41127][CONNECT][PYTHON] Implement
DataFrame.CreateGlobalView in Python client
3000c000d14 is described below
commit 3000c000d14c7e2c0cb3853550ba1d037d351a32
Author: Rui Wang <[email protected]>
AuthorDate: Tue Nov 15 16:28:59 2022 +0900
[SPARK-41127][CONNECT][PYTHON] Implement DataFrame.CreateGlobalView in
Python client
### What changes were proposed in this pull request?
This PR adds `CreateGlobalTempView` and `CreateOrReplaceGlobalTempView` to
Python DataFrame API.
Meanwhile, this PR extends `LogicalPlan` to let it have the ability to deal
with `Command`.
### Why are the changes needed?
Improve API coverage.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes #38642 from amaliujia/create_temp_view_in_python.
Authored-by: Rui Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client.py | 7 ++++
python/pyspark/sql/connect/dataframe.py | 34 ++++++++++++++++
python/pyspark/sql/connect/plan.py | 45 ++++++++++++++++++++++
.../sql/tests/connect/test_connect_basic.py | 17 +++++++-
4 files changed, 101 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/sql/connect/client.py
b/python/pyspark/sql/connect/client.py
index c2d808bb6ee..62949720134 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -400,6 +400,13 @@ class RemoteSparkSession(object):
def explain_string(self, plan: pb2.Plan) -> str:
return self._analyze(plan).explain_string
+ def execute_command(self, command: pb2.Command) -> None:
+ req = pb2.Request()
+ if self._user_id:
+ req.user_context.user_id = self._user_id
+ req.plan.command.CopyFrom(command)
+ self._execute_and_fetch(req)
+
def _analyze(self, plan: pb2.Plan) -> AnalyzeResult:
req = pb2.Request()
if self._user_id:
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 4e9c6f919a8..0dccd9ffb97 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -676,6 +676,40 @@ class DataFrame(object):
else:
return ""
+ def createGlobalTempView(self, name: str) -> None:
+ """Creates a global temporary view with this :class:`DataFrame`.
+
+ The lifetime of this temporary view is tied to this Spark application.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ name : str
+ Name of the view.
+ """
+ command = plan.CreateView(
+ child=self._plan, name=name, is_global=True, replace=False
+ ).command(session=self._session)
+ self._session.execute_command(command)
+
+ def createOrReplaceGlobalTempView(self, name: str) -> None:
+ """Creates or replaces a global temporary view using the given name.
+
+ The lifetime of this temporary view is tied to this Spark application.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ name : str
+ Name of the view.
+ """
+ command = plan.CreateView(
+ child=self._plan, name=name, is_global=True, replace=True
+ ).command(session=self._session)
+ self._session.execute_command(command)
+
class DataFrameStatFunctions:
"""Functionality for statistic functions with :class:`DataFrame`.
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index aa80730c651..c457056d5a4 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -69,6 +69,9 @@ class LogicalPlan(object):
def plan(self, session: "RemoteSparkSession") -> proto.Relation:
...
+ def command(self, session: "RemoteSparkSession") -> proto.Command:
+ ...
+
def _verify(self, session: "RemoteSparkSession") -> bool:
"""This method is used to verify that the current logical plan
can be serialized to Proto and back and afterwards is identical."""
@@ -902,3 +905,45 @@ class StatCrosstab(LogicalPlan):
</li>
</ul>
"""
+
+
+class CreateView(LogicalPlan):
+ def __init__(
+ self, child: Optional["LogicalPlan"], name: str, is_global: bool,
replace: bool
+ ) -> None:
+ super().__init__(child)
+ self._name = name
+ self._is_gloal = is_global
+ self._replace = replace
+
+ def command(self, session: "RemoteSparkSession") -> proto.Command:
+ assert self._child is not None
+
+ plan = proto.Command()
+ plan.create_dataframe_view.replace = self._replace
+ plan.create_dataframe_view.is_global = self._is_gloal
+ plan.create_dataframe_view.name = self._name
+ plan.create_dataframe_view.input.CopyFrom(self._child.plan(session))
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return (
+ f"{i}"
+ f"<CreateView name='{self._name}' "
+ f"is_global='{self._is_gloal} "
+ f"replace='{self._replace}'>"
+ )
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>CreateView</b><br />
+ name: {self._name} <br />
+ is_global: {self._is_gloal} <br />
+ replace: {self._replace} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 1314699887c..bb020db3c79 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -20,8 +20,9 @@ import shutil
import tempfile
import grpc # type: ignore
+from grpc._channel import _MultiThreadedRendezvous # type: ignore
-from pyspark.testing.sqlutils import have_pandas
+from pyspark.testing.sqlutils import have_pandas, SQLTestUtils
if have_pandas:
import pandas
@@ -39,7 +40,7 @@ from pyspark.testing.utils import ReusedPySparkTestCase
@unittest.skipIf(not should_test_connect, connect_requirement_message)
-class SparkConnectSQLTestCase(ReusedPySparkTestCase):
+class SparkConnectSQLTestCase(ReusedPySparkTestCase, SQLTestUtils):
"""Parent test fixture class for all Spark Connect related
test cases."""
@@ -207,6 +208,18 @@ class SparkConnectTests(SparkConnectSQLTestCase):
.equals(self.spark.range(start=0, end=10, step=3,
numPartitions=2).toPandas())
)
+ def test_create_global_temp_view(self):
+ # SPARK-41127: test global temp view creation.
+ with self.tempView("view_1"):
+ self.connect.sql("SELECT 1 AS X LIMIT
0").createGlobalTempView("view_1")
+ self.connect.sql("SELECT 2 AS X LIMIT
1").createOrReplaceGlobalTempView("view_1")
+
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))
+
+ # Test when creating a view which is alreayd exists but
+
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))
+ with self.assertRaises(_MultiThreadedRendezvous):
+ self.connect.sql("SELECT 1 AS X LIMIT
0").createGlobalTempView("view_1")
+
def test_empty_dataset(self):
# SPARK-41005: Test arrow based collection with empty dataset.
self.assertTrue(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]