This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 45bb9578a0f [SPARK-40813][CONNECT][PYTHON][FOLLOW-UP] Improve limit
and offset in Python client
45bb9578a0f is described below
commit 45bb9578a0f6b40b472588a407d842f293e9e323
Author: Rui Wang <[email protected]>
AuthorDate: Fri Oct 21 10:33:18 2022 +0900
[SPARK-40813][CONNECT][PYTHON][FOLLOW-UP] Improve limit and offset in
Python client
### What changes were proposed in this pull request?
Following up after https://github.com/apache/spark/pull/38275, improve
limit and offset in Python client.
### Why are the changes needed?
Improve API coverage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #38314 from amaliujia/python_test_limit_offset.
Authored-by: Rui Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/dataframe.py | 3 ++
python/pyspark/sql/connect/plan.py | 32 ++++++++++++++++++++--
.../sql/tests/connect/test_connect_basic.py | 7 +++++
.../sql/tests/connect/test_connect_plan_only.py | 10 +++++++
4 files changed, 49 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 1f7e789818f..5ca747fdd6a 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -199,6 +199,9 @@ class DataFrame(object):
def limit(self, n: int) -> "DataFrame":
return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n),
session=self._session)
+ def offset(self, n: int) -> "DataFrame":
+ return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n),
session=self._session)
+
def sort(self, *cols: "ColumnOrString") -> "DataFrame":
"""Sort by a specific column"""
return DataFrame.withPlan(plan.Sort(self._plan, *cols),
session=self._session)
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index c564b71cdba..5b8b7c71866 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -272,10 +272,9 @@ class Filter(LogicalPlan):
class Limit(LogicalPlan):
- def __init__(self, child: Optional["LogicalPlan"], limit: int, offset: int
= 0) -> None:
+ def __init__(self, child: Optional["LogicalPlan"], limit: int) -> None:
super().__init__(child)
self.limit = limit
- self.offset = offset
def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
assert self._child is not None
@@ -286,7 +285,7 @@ class Limit(LogicalPlan):
def print(self, indent: int = 0) -> str:
c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child
else ""
- return f"{' ' * indent}<Limit limit={self.limit}
offset={self.offset}>\n{c_buf}"
+ return f"{' ' * indent}<Limit limit={self.limit}>\n{c_buf}"
def _repr_html_(self) -> str:
return f"""
@@ -294,6 +293,33 @@ class Limit(LogicalPlan):
<li>
<b>Limit</b><br />
Limit: {self.limit} <br />
+ {self._child_repr_()}
+ </li>
+ </uL>
+ """
+
+
+class Offset(LogicalPlan):
+ def __init__(self, child: Optional["LogicalPlan"], offset: int = 0) ->
None:
+ super().__init__(child)
+ self.offset = offset
+
+ def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
+ assert self._child is not None
+ plan = proto.Relation()
+ plan.offset.input.CopyFrom(self._child.plan(session))
+ plan.offset.offset = self.offset
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child
else ""
+ return f"{' ' * indent}<Offset={self.offset}>\n{c_buf}"
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>Limit</b><br />
Offset: {self.offset} <br />
{self._child_repr_()}
</li>
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index de300946932..f6988a1d120 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -106,6 +106,13 @@ class SparkConnectTests(SparkConnectSQLTestCase):
res = pandas.DataFrame(data={"id": [0, 30, 60, 90]})
self.assert_(pd.equals(res), f"{pd.to_string()} != {res.to_string()}")
+ def test_limit_offset(self):
+ df = self.connect.read.table(self.tbl_name)
+ pd = df.limit(10).offset(1).toPandas()
+ self.assertEqual(9, len(pd.index))
+ pd2 = df.offset(98).limit(10).toPandas()
+ self.assertEqual(2, len(pd2.index))
+
def test_simple_datasource_read(self) -> None:
writeDf = self.df_text
tmpPath = tempfile.mkdtemp()
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index 96bbb8aa834..739c24ca96e 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -44,6 +44,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
self.assertEqual(plan.root.filter.condition.unresolved_function.parts,
[">"])
self.assertEqual(len(plan.root.filter.condition.unresolved_function.arguments),
2)
+ def test_limit(self):
+ df = self.connect.readTable(table_name=self.tbl_name)
+ limit_plan = df.limit(10)._plan.to_proto(self.connect)
+ self.assertEqual(limit_plan.root.limit.limit, 10)
+
+ def test_offset(self):
+ df = self.connect.readTable(table_name=self.tbl_name)
+ offset_plan = df.offset(10)._plan.to_proto(self.connect)
+ self.assertEqual(offset_plan.root.offset.offset, 10)
+
def test_relation_alias(self):
df = self.connect.readTable(table_name=self.tbl_name)
plan = df.alias("table_alias")._plan.to_proto(self.connect)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]