This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 103ba7bf773 [SPARK-38793][PYTHON] Support `return_indexer` parameter
of `Index/MultiIndex.sort_values`
103ba7bf773 is described below
commit 103ba7bf773ce401eab8e31ae0a2fe67a30e57f9
Author: Xinrong Meng <[email protected]>
AuthorDate: Wed Apr 13 12:17:35 2022 +0900
[SPARK-38793][PYTHON] Support `return_indexer` parameter of
`Index/MultiIndex.sort_values`
### What changes were proposed in this pull request?
Support `return_indexer` parameter of `Index/MultiIndex.sort_values`.
Note that this method returns indexer as a pandas-on-Spark index while
pandas returns it as a list. That's because indexer in pandas-on-Spark may not
fit in memory.
### Why are the changes needed?
To reach parity with pandas.
### Does this PR introduce _any_ user-facing change?
Yes. `return_indexer` parameter of `Index/MultiIndex.sort_values` is
supported as below.
Index
```py
>>> idx = ps.Index([10, 100, 1, 1000])
>>> idx
Int64Index([10, 100, 1, 1000], dtype='int64')
>>> idx.sort_values(ascending=False, return_indexer=True)
(Int64Index([1000, 100, 10, 1], dtype='int64'), Int64Index([3, 1,
0, 2], dtype='int64'))
```
MultiIndex
```py
>>> psidx = ps.MultiIndex.from_tuples([('a', 'x', 1), ('c', 'y',
2), ('b', 'z', 3)])
>>> psidx
MultiIndex([('a', 'x', 1),
('c', 'y', 2),
('b', 'z', 3)],
)
>>> psidx.sort_values(ascending=False, return_indexer=True)
(MultiIndex([('c', 'y', 2),
('b', 'z', 3),
('a', 'x', 1)],
), Int64Index([1, 2, 0], dtype='int64'))
```
### How was this patch tested?
Unit test.
Closes #36076 from xinrong-databricks/index.sort_values.
Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/indexes/base.py | 53 ++++++++++++++++++++----
python/pyspark/pandas/series.py | 6 +--
python/pyspark/pandas/tests/indexes/test_base.py | 31 +++++++++++---
3 files changed, 73 insertions(+), 17 deletions(-)
diff --git a/python/pyspark/pandas/indexes/base.py
b/python/pyspark/pandas/indexes/base.py
index d7119f032f3..da2bb5dc9c9 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -1543,17 +1543,24 @@ class Index(IndexOpsMixin):
return result
- # TODO: return_indexer
- def sort_values(self, ascending: bool = True) -> "Index":
+ def sort_values(
+ self, return_indexer: bool = False, ascending: bool = True
+ ) -> Union["Index", Tuple["Index", "Index"]]:
"""
- Return a sorted copy of the index.
+ Return a sorted copy of the index, and optionally return the indices
that
+ sorted the index itself.
.. note:: This method is not supported for pandas when index has NaN
value.
pandas raises unexpected TypeError, but we support treating
NaN
as the smallest value.
+ This method returns indexer as a pandas-on-Spark index while
+ pandas returns it as a list. That's because indexer in
pandas-on-Spark
+ may not fit in memory.
Parameters
----------
+ return_indexer : bool, default False
+ Should the indices that would sort the index be returned.
ascending : bool, default True
Should the index values be sorted in an ascending order.
@@ -1561,6 +1568,8 @@ class Index(IndexOpsMixin):
-------
sorted_index : ps.Index or ps.MultiIndex
Sorted copy of the index.
+ indexer : ps.Index
+ The indices that the index itself was sorted by.
See Also
--------
@@ -1583,6 +1592,11 @@ class Index(IndexOpsMixin):
>>> idx.sort_values(ascending=False)
Int64Index([1000, 100, 10, 1], dtype='int64')
+ Sort values in descending order, and also get the indices idx was
sorted by.
+
+ >>> idx.sort_values(ascending=False, return_indexer=True)
+ (Int64Index([1000, 100, 10, 1], dtype='int64'), Int64Index([3, 1, 0,
2], dtype='int64'))
+
Support for MultiIndex.
>>> psidx = ps.MultiIndex.from_tuples([('a', 'x', 1), ('c', 'y', 2),
('b', 'z', 3)])
@@ -1603,11 +1617,20 @@ class Index(IndexOpsMixin):
('b', 'z', 3),
('a', 'x', 1)],
)
+
+ >>> psidx.sort_values(ascending=False, return_indexer=True) #
doctest: +SKIP
+ (MultiIndex([('c', 'y', 2),
+ ('b', 'z', 3),
+ ('a', 'x', 1)],
+ ), Int64Index([1, 2, 0], dtype='int64'))
"""
sdf = self._internal.spark_frame
- sdf = sdf.orderBy(*self._internal.index_spark_columns,
ascending=ascending).select(
- self._internal.index_spark_columns
- )
+ if return_indexer:
+ sequence_col = verify_temp_column_name(sdf,
"__distributed_sequence_column__")
+ sdf = InternalFrame.attach_distributed_sequence_column(sdf,
column_name=sequence_col)
+
+ ordered_sdf = sdf.orderBy(*self._internal.index_spark_columns,
ascending=ascending)
+ sdf = ordered_sdf.select(self._internal.index_spark_columns)
internal = InternalFrame(
spark_frame=sdf,
@@ -1617,7 +1640,21 @@ class Index(IndexOpsMixin):
index_names=self._internal.index_names,
index_fields=self._internal.index_fields,
)
- return DataFrame(internal).index
+ sorted_index = DataFrame(internal).index
+
+ if return_indexer:
+ alias_sequence_scol = scol_for(ordered_sdf, sequence_col).alias(
+ SPARK_DEFAULT_INDEX_NAME
+ )
+ indexer_sdf = ordered_sdf.select(alias_sequence_scol)
+ indexer_internal = InternalFrame(
+ spark_frame=indexer_sdf,
+ index_spark_columns=[scol_for(indexer_sdf,
SPARK_DEFAULT_INDEX_NAME)],
+ )
+ indexer = DataFrame(indexer_internal).index
+ return sorted_index, indexer
+ else:
+ return sorted_index
@no_type_check
def sort(self, *args, **kwargs) -> None:
@@ -2136,7 +2173,7 @@ class Index(IndexOpsMixin):
elif isinstance(self, type(other)) and not isinstance(self,
MultiIndex):
if self.name == other.name:
result.name = self.name
- return result if sort is None else result.sort_values()
+ return result if sort is None else cast(Index, result.sort_values())
@property
def is_all_dates(self) -> bool:
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 9856b59947a..ced81b12e8c 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -5304,9 +5304,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
dtype: int64
"""
if not same_anchor(self, other):
- if get_option("compute.eager_check") and not
self.index.sort_values().equals(
- other.index.sort_values()
- ):
+ if get_option("compute.eager_check") and not cast(
+ ps.Index, self.index.sort_values()
+ ).equals(cast(ps.Index, other.index.sort_values())):
raise ValueError("matrices are not aligned")
elif len(self.index) != len(other.index):
raise ValueError("matrices are not aligned")
diff --git a/python/pyspark/pandas/tests/indexes/test_base.py
b/python/pyspark/pandas/tests/indexes/test_base.py
index de138b58c68..5379e512825 100644
--- a/python/pyspark/pandas/tests/indexes/test_base.py
+++ b/python/pyspark/pandas/tests/indexes/test_base.py
@@ -802,18 +802,38 @@ class IndexesTest(ComparisonTestBase, TestUtils):
psidx.names = ["lv", "lv"]
self.assertRaises(ValueError, lambda: psidx.drop(["x", "y"],
level="lv"))
+ def _test_sort_values(self, pidx, psidx):
+ self.assert_eq(pidx.sort_values(), psidx.sort_values())
+ # Parameter ascending
+ self.assert_eq(pidx.sort_values(ascending=False),
psidx.sort_values(ascending=False))
+ # Parameter return_indexer
+ p_sorted, p_indexer = pidx.sort_values(return_indexer=True)
+ ps_sorted, ps_indexer = psidx.sort_values(return_indexer=True)
+ self.assert_eq(p_sorted, ps_sorted)
+ self.assert_eq(p_indexer, ps_indexer.to_list())
+ self.assert_eq(
+ pidx.sort_values(return_indexer=False),
psidx.sort_values(return_indexer=False)
+ )
+ # Parameter return_indexer and ascending
+ p_sorted, p_indexer = pidx.sort_values(return_indexer=True,
ascending=False)
+ ps_sorted, ps_indexer = psidx.sort_values(return_indexer=True,
ascending=False)
+ self.assert_eq(p_sorted, ps_sorted)
+ self.assert_eq(p_indexer, ps_indexer.to_list())
+ self.assert_eq(
+ pidx.sort_values(return_indexer=False, ascending=False),
+ psidx.sort_values(return_indexer=False, ascending=False),
+ )
+
def test_sort_values(self):
pidx = pd.Index([-10, -100, 200, 100])
psidx = ps.from_pandas(pidx)
- self.assert_eq(pidx.sort_values(), psidx.sort_values())
- self.assert_eq(pidx.sort_values(ascending=False),
psidx.sort_values(ascending=False))
+ self._test_sort_values(pidx, psidx)
pidx.name = "koalas"
psidx.name = "koalas"
- self.assert_eq(pidx.sort_values(), psidx.sort_values())
- self.assert_eq(pidx.sort_values(ascending=False),
psidx.sort_values(ascending=False))
+ self._test_sort_values(pidx, psidx)
pidx = pd.MultiIndex.from_tuples([("a", "x", 1), ("b", "y", 2), ("c",
"z", 3)])
psidx = ps.from_pandas(pidx)
@@ -821,8 +841,7 @@ class IndexesTest(ComparisonTestBase, TestUtils):
pidx.names = ["hello", "koalas", "goodbye"]
psidx.names = ["hello", "koalas", "goodbye"]
- self.assert_eq(pidx.sort_values(), psidx.sort_values())
- self.assert_eq(pidx.sort_values(ascending=False),
psidx.sort_values(ascending=False))
+ self._test_sort_values(pidx, psidx)
def test_index_drop_duplicates(self):
pidx = pd.Index([1, 1, 2])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]