This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d67d73b [SPARK-35505][PYTHON] Remove APIs which have been deprecated
in Koalas
d67d73b is described below
commit d67d73b70860d4e56fdcd6fc61f826245a52d186
Author: Takuya UESHIN <[email protected]>
AuthorDate: Tue May 25 11:16:27 2021 -0700
[SPARK-35505][PYTHON] Remove APIs which have been deprecated in Koalas
### What changes were proposed in this pull request?
Removes APIs which have been deprecated in Koalas.
### Why are the changes needed?
There are some APIs that have been deprecated in Koalas. We shouldn't have
those in pandas APIs on Spark.
### Does this PR introduce _any_ user-facing change?
Yes, the APIs deprecated in Koalas will be no longer available.
### How was this patch tested?
Modified some tests which use the deprecated APIs, and the other existing
tests should pass.
Closes #32656 from ueshin/issues/SPARK-35505/remove_deprecated.
Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
---
python/pyspark/pandas/base.py | 12 ---
python/pyspark/pandas/frame.py | 123 --------------------------
python/pyspark/pandas/indexes/base.py | 21 +----
python/pyspark/pandas/indexes/multi.py | 11 ---
python/pyspark/pandas/series.py | 40 ---------
python/pyspark/pandas/tests/test_dataframe.py | 79 ++++++++++-------
6 files changed, 46 insertions(+), 240 deletions(-)
diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py
index cb8a2c9..55a2c48 100644
--- a/python/pyspark/pandas/base.py
+++ b/python/pyspark/pandas/base.py
@@ -22,7 +22,6 @@ from abc import ABCMeta, abstractmethod
from functools import wraps, partial
from itertools import chain
from typing import Any, Callable, Optional, Tuple, Union, cast, TYPE_CHECKING
-import warnings
import numpy as np
import pandas as pd # noqa: F401
@@ -308,17 +307,6 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
pass
@property
- def spark_column(self) -> Column:
- warnings.warn(
- "Series.spark_column is deprecated as of Series.spark.column. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.column
-
- spark_column.__doc__ = SparkIndexOpsMethods.column.__doc__
-
- @property
def _dtype_op(self):
from pyspark.pandas.data_type_ops.base import DataTypeOps
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 9d2553d..709696e 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -2298,27 +2298,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
T = property(transpose)
- def apply_batch(self, func, args=(), **kwds) -> "DataFrame":
- warnings.warn(
- "DataFrame.apply_batch is deprecated as of
DataFrame.koalas.apply_batch. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.koalas.apply_batch(func, args=args, **kwds)
-
- apply_batch.__doc__ = PandasOnSparkFrameMethods.apply_batch.__doc__
-
- # TODO: Remove this API.
- def map_in_pandas(self, func) -> "DataFrame":
- warnings.warn(
- "DataFrame.map_in_pandas is deprecated as of
DataFrame.koalas.apply_batch. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.koalas.apply_batch(func)
-
- map_in_pandas.__doc__ = PandasOnSparkFrameMethods.apply_batch.__doc__
-
def apply(self, func, axis=0, args=(), **kwds) -> Union["Series",
"DataFrame", "Index"]:
"""
Apply a function along an axis of the DataFrame.
@@ -2768,16 +2747,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
lambda psser: psser.koalas.transform_batch(func, *args,
**kwargs)
)
- def transform_batch(self, func, *args, **kwargs) -> "DataFrame":
- warnings.warn(
- "DataFrame.transform_batch is deprecated as of
DataFrame.koalas.transform_batch. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.koalas.transform_batch(func, *args, **kwargs)
-
- transform_batch.__doc__ = PandasOnSparkFrameMethods.transform_batch.__doc__
-
def pop(self, item) -> "DataFrame":
"""
Return item and drop from frame. Raise KeyError if not found.
@@ -4573,36 +4542,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
)
return DataFrame(internal)
- def cache(self) -> "CachedDataFrame":
- warnings.warn(
- "DataFrame.cache is deprecated as of DataFrame.spark.cache. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.cache()
-
- cache.__doc__ = SparkFrameMethods.cache.__doc__
-
- def persist(self, storage_level=StorageLevel.MEMORY_AND_DISK) ->
"CachedDataFrame":
- warnings.warn(
- "DataFrame.persist is deprecated as of DataFrame.spark.persist. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.persist(storage_level)
-
- persist.__doc__ = SparkFrameMethods.persist.__doc__
-
- def hint(self, name: str, *parameters) -> "DataFrame":
- warnings.warn(
- "DataFrame.hint is deprecated as of DataFrame.spark.hint. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.hint(name, *parameters)
-
- hint.__doc__ = SparkFrameMethods.hint.__doc__
-
def to_table(
self,
name: str,
@@ -4875,17 +4814,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
"""
return self._internal.to_pandas_frame.copy()
- # Alias to maintain backward compatibility with Spark
- def toPandas(self) -> pd.DataFrame:
- warnings.warn(
- "DataFrame.toPandas is deprecated as of DataFrame.to_pandas. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.to_pandas()
-
- toPandas.__doc__ = to_pandas.__doc__
-
def assign(self, **kwargs) -> "DataFrame":
"""
Assign new columns to a DataFrame.
@@ -6345,26 +6273,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
),
)
- def spark_schema(self, index_col: Optional[Union[str, List[str]]] = None)
-> StructType:
- warnings.warn(
- "DataFrame.spark_schema is deprecated as of
DataFrame.spark.schema. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.schema(index_col)
-
- spark_schema.__doc__ = SparkFrameMethods.schema.__doc__
-
- def print_schema(self, index_col: Optional[Union[str, List[str]]] = None)
-> None:
- warnings.warn(
- "DataFrame.print_schema is deprecated as of
DataFrame.spark.print_schema. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.print_schema(index_col)
-
- print_schema.__doc__ = SparkFrameMethods.print_schema.__doc__
-
def select_dtypes(self, include=None, exclude=None) -> "DataFrame":
"""
Return a subset of the DataFrame's columns based on the column dtypes.
@@ -10926,16 +10834,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
else:
return DataFrame(internal)
- def explain(self, extended: Optional[bool] = None, mode: Optional[str] =
None) -> None:
- warnings.warn(
- "DataFrame.explain is deprecated as of DataFrame.spark.explain. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.explain(extended, mode)
-
- explain.__doc__ = SparkFrameMethods.explain.__doc__
-
def take(self, indices, axis=0, **kwargs) -> "DataFrame":
"""
Return the elements in the given *positional* indices along an axis.
@@ -11929,27 +11827,6 @@ class CachedDataFrame(DataFrame):
# create accessor for Spark related methods.
spark = CachedAccessor("spark", CachedSparkFrameMethods)
- @property
- def storage_level(self) -> StorageLevel:
- warnings.warn(
- "DataFrame.storage_level is deprecated as of
DataFrame.spark.storage_level. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.storage_level
-
- storage_level.__doc__ = CachedSparkFrameMethods.storage_level.__doc__
-
- def unpersist(self) -> None:
- warnings.warn(
- "DataFrame.unpersist is deprecated as of
DataFrame.spark.unpersist. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.unpersist()
-
- unpersist.__doc__ = CachedSparkFrameMethods.unpersist.__doc__
-
def _test():
import os
diff --git a/python/pyspark/pandas/indexes/base.py
b/python/pyspark/pandas/indexes/base.py
index 67f89c0..f62f30d 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -38,7 +38,7 @@ from pandas._libs import lib
from pyspark import sql as spark
from pyspark.sql import functions as F
-from pyspark.sql.types import DataType, FractionalType, IntegralType,
TimestampType
+from pyspark.sql.types import FractionalType, IntegralType, TimestampType
from pyspark import pandas as ps # For running doctests and reference
resolution in PyCharm.
from pyspark.pandas.config import get_option, option_context
@@ -483,15 +483,6 @@ class Index(IndexOpsMixin):
"""
return self._to_internal_pandas().copy()
- def toPandas(self) -> pd.Index:
- warnings.warn(
- "Index.toPandas is deprecated as of Index.to_pandas. Please use
the API instead.",
- FutureWarning,
- )
- return self.to_pandas()
-
- toPandas.__doc__ = to_pandas.__doc__
-
def to_numpy(self, dtype: Optional[Union[str, Dtype]] = None, copy: bool =
False) -> np.ndarray:
"""
A NumPy ndarray representing the values in this Index or MultiIndex.
@@ -583,16 +574,6 @@ class Index(IndexOpsMixin):
return None
@property
- def spark_type(self) -> DataType:
- """ Returns the data type as defined by Spark, as a Spark DataType
object."""
- warnings.warn(
- "Index.spark_type is deprecated as of Index.spark.data_type. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.data_type
-
- @property
def has_duplicates(self) -> bool:
"""
If index has duplicates, return True, otherwise False.
diff --git a/python/pyspark/pandas/indexes/multi.py
b/python/pyspark/pandas/indexes/multi.py
index 9c4e95c..ee40d06 100644
--- a/python/pyspark/pandas/indexes/multi.py
+++ b/python/pyspark/pandas/indexes/multi.py
@@ -18,7 +18,6 @@
from distutils.version import LooseVersion
from functools import partial
from typing import Any, Callable, Iterator, List, Optional, Tuple, Union,
cast, no_type_check
-import warnings
import pandas as pd
from pandas.api.types import is_list_like
@@ -681,16 +680,6 @@ class MultiIndex(Index):
# series-like operations. In that case, it creates new Index object
instead of MultiIndex.
return super().to_pandas()
- def toPandas(self) -> pd.MultiIndex:
- warnings.warn(
- "MultiIndex.toPandas is deprecated as of MultiIndex.to_pandas. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.to_pandas()
-
- toPandas.__doc__ = to_pandas.__doc__
-
def nunique(self, dropna: bool = True, approx: bool = False, rsd: float =
0.05) -> int:
raise NotImplementedError("nunique is not defined for MultiIndex")
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index d4c72a4..e20ddbc 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -22,7 +22,6 @@ import datetime
import re
import inspect
import sys
-import warnings
from collections.abc import Mapping
from functools import partial, wraps, reduce
from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar,
Union, cast
@@ -467,17 +466,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
"""
return [self.index]
- @property
- def spark_type(self):
- warnings.warn(
- "Series.spark_type is deprecated as of Series.spark.data_type. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.spark.data_type
-
- spark_type.__doc__ = SparkSeriesMethods.data_type.__doc__
-
# Arithmetic Operators
def add(self, other) -> "Series":
return self + other
@@ -1017,14 +1005,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
else:
return self.apply(arg)
- def alias(self, name) -> "Series":
- """An alias for :meth:`Series.rename`."""
- warnings.warn(
- "Series.alias is deprecated as of Series.rename. Please use the
API instead.",
- FutureWarning,
- )
- return self.rename(name)
-
@property
def shape(self):
"""Return a tuple of the shape of the underlying data."""
@@ -1518,16 +1498,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
"""
return self._to_internal_pandas().copy()
- # Alias to maintain backward compatibility with Spark
- def toPandas(self) -> pd.Series:
- warnings.warn(
- "Series.toPandas is deprecated as of Series.to_pandas. Please use
the API instead.",
- FutureWarning,
- )
- return self.to_pandas()
-
- toPandas.__doc__ = to_pandas.__doc__
-
def to_list(self) -> List:
"""
Return a list of the values.
@@ -3300,16 +3270,6 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
else:
return self.apply(func, args=args, **kwargs)
- def transform_batch(self, func, *args, **kwargs) -> "ps.Series":
- warnings.warn(
- "Series.transform_batch is deprecated as of
Series.koalas.transform_batch. "
- "Please use the API instead.",
- FutureWarning,
- )
- return self.koalas.transform_batch(func, *args, **kwargs)
-
- transform_batch.__doc__ =
PandasOnSparkSeriesMethods.transform_batch.__doc__
-
def round(self, decimals=0) -> "Series":
"""
Round each value in a Series to the given number of decimals.
diff --git a/python/pyspark/pandas/tests/test_dataframe.py
b/python/pyspark/pandas/tests/test_dataframe.py
index 590900f..5e6b6b9 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -28,6 +28,7 @@ from pandas.tseries.offsets import DateOffset
from pyspark import StorageLevel
from pyspark.ml.linalg import SparseVector
from pyspark.sql import functions as F
+from pyspark.sql.types import StructType
from pyspark import pandas as ps
from pyspark.pandas.config import option_context
@@ -1849,7 +1850,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
def test_to_pandas(self):
pdf, psdf = self.df_pair
- self.assert_eq(psdf.toPandas(), pdf)
self.assert_eq(psdf.to_pandas(), pdf)
def test_isin(self):
@@ -4324,8 +4324,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
)
psdf = ps.DataFrame(pdf)
- # One to test alias.
- self.assert_eq(psdf.apply_batch(lambda pdf: pdf + 1).sort_index(),
(pdf + 1).sort_index())
self.assert_eq(
psdf.koalas.apply_batch(lambda pdf, a: pdf + a,
args=(1,)).sort_index(),
(pdf + 1).sort_index(),
@@ -4377,10 +4375,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
)
psdf = ps.DataFrame(pdf)
- # One to test alias.
- self.assert_eq(
- psdf.transform_batch(lambda pdf: pdf + 1).sort_index(), (pdf +
1).sort_index()
- )
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf.c + 1).sort_index(),
(pdf.c + 1).sort_index(),
@@ -4445,14 +4439,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
)
psdf = ps.range(10)
- # One to test alias.
- psdf["d"] = psdf.id.transform_batch(lambda ser: ser + 1)
- self.assert_eq(
- psdf,
- pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))},
columns=["id", "d"]),
- )
-
- psdf = ps.range(10)
def plus_one(pdf) -> ps.Series[np.int64]:
return pdf.id + 1
@@ -4796,10 +4782,10 @@ class DataFrameTest(PandasOnSparkTestCase,
SQLTestUtils):
)
psdf = ps.from_pandas(pdf)
- with psdf.cache() as cached_df:
+ with psdf.spark.cache() as cached_df:
self.assert_eq(isinstance(cached_df, CachedDataFrame), True)
self.assert_eq(
- repr(cached_df.storage_level), repr(StorageLevel(True, True,
False, True))
+ repr(cached_df.spark.storage_level), repr(StorageLevel(True,
True, False, True))
)
def test_persist(self):
@@ -4815,11 +4801,11 @@ class DataFrameTest(PandasOnSparkTestCase,
SQLTestUtils):
]
for storage_level in storage_levels:
- with psdf.persist(storage_level) as cached_df:
+ with psdf.spark.persist(storage_level) as cached_df:
self.assert_eq(isinstance(cached_df, CachedDataFrame), True)
- self.assert_eq(repr(cached_df.storage_level),
repr(storage_level))
+ self.assert_eq(repr(cached_df.spark.storage_level),
repr(storage_level))
- self.assertRaises(TypeError, lambda: psdf.persist("DISK_ONLY"))
+ self.assertRaises(TypeError, lambda: psdf.spark.persist("DISK_ONLY"))
def test_squeeze(self):
axises = [None, 0, 1, "rows", "index", "columns"]
@@ -5075,8 +5061,31 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
},
columns=["a", "b", "c", "d", "e", "f"],
)
- self.assertEqual(psdf.spark_schema(), psdf.spark.schema())
- self.assertEqual(psdf.spark_schema("index"),
psdf.spark.schema("index"))
+
+ actual = psdf.spark.schema()
+ expected = (
+ StructType()
+ .add("a", "string", False)
+ .add("b", "long", False)
+ .add("c", "byte", False)
+ .add("d", "double", False)
+ .add("e", "boolean", False)
+ .add("f", "timestamp", False)
+ )
+ self.assertEqual(actual, expected)
+
+ actual = psdf.spark.schema("index")
+ expected = (
+ StructType()
+ .add("index", "long", False)
+ .add("a", "string", False)
+ .add("b", "long", False)
+ .add("c", "byte", False)
+ .add("d", "double", False)
+ .add("e", "boolean", False)
+ .add("f", "timestamp", False)
+ )
+ self.assertEqual(actual, expected)
def test_print_schema(self):
psdf = ps.DataFrame(
@@ -5088,15 +5097,22 @@ class DataFrameTest(PandasOnSparkTestCase,
SQLTestUtils):
try:
out = StringIO()
sys.stdout = out
- psdf.print_schema()
+ psdf.spark.print_schema()
actual = out.getvalue().strip()
+ self.assertTrue("a: string" in actual, actual)
+ self.assertTrue("b: long" in actual, actual)
+ self.assertTrue("c: byte" in actual, actual)
+
out = StringIO()
sys.stdout = out
- psdf.spark.print_schema()
- expected = out.getvalue().strip()
+ psdf.spark.print_schema(index_col="index")
+ actual = out.getvalue().strip()
- self.assertEqual(actual, expected)
+ self.assertTrue("index: long" in actual, actual)
+ self.assertTrue("a: string" in actual, actual)
+ self.assertTrue("b: long" in actual, actual)
+ self.assertTrue("c: byte" in actual, actual)
finally:
sys.stdout = prev
@@ -5107,20 +5123,15 @@ class DataFrameTest(PandasOnSparkTestCase,
SQLTestUtils):
psdf2 = ps.DataFrame(
{"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]},
columns=["rkey", "value"]
)
- merged = psdf1.merge(psdf2.hint("broadcast"), left_on="lkey",
right_on="rkey")
+ merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey",
right_on="rkey")
prev = sys.stdout
try:
out = StringIO()
sys.stdout = out
- merged.explain()
- actual = out.getvalue().strip()
-
- out = StringIO()
- sys.stdout = out
merged.spark.explain()
- expected = out.getvalue().strip()
+ actual = out.getvalue().strip()
- self.assertEqual(actual, expected)
+ self.assertTrue("Broadcast" in actual, actual)
finally:
sys.stdout = prev
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]