This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 03e6de2 [SPARK-35605][PYTHON] Move to_pandas_on_spark to the Spark
DataFrame
03e6de2 is described below
commit 03e6de2abe6b767e55de2761499ae937db588508
Author: itholic <[email protected]>
AuthorDate: Mon Jun 28 11:47:09 2021 +0900
[SPARK-35605][PYTHON] Move to_pandas_on_spark to the Spark DataFrame
### What changes were proposed in this pull request?
This PR proposes move `to_pandas_on_spark` function from
`pyspark.pandas.frame` to `pyspark.sql.dataframe`, and added the related tests
to the PySpark DataFrame tests.
### Why are the changes needed?
Because now the Koalas is ported into PySpark, so we don't need to Spark
auto-patch anymore.
And also `to_pandas_on_spark` is belongs to the pandas-on-Spark DataFrame
doesn't look make sense.
### Does this PR introduce _any_ user-facing change?
No, it's kinda internal refactoring stuff.
### How was this patch tested?
Added the related tests and manually check they're passed.
Closes #33054 from itholic/SPARK-35605.
Authored-by: itholic <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../docs/source/reference/pyspark.pandas/frame.rst | 1 -
python/docs/source/reference/pyspark.sql.rst | 1 +
python/pyspark/pandas/__init__.py | 16 -----
python/pyspark/pandas/frame.py | 75 ----------------------
python/pyspark/pandas/plot/core.py | 2 +-
python/pyspark/sql/dataframe.py | 63 ++++++++++++++++++
python/pyspark/sql/dataframe.pyi | 2 +
python/pyspark/sql/tests/test_dataframe.py | 16 +++++
8 files changed, 83 insertions(+), 93 deletions(-)
diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst
b/python/docs/source/reference/pyspark.pandas/frame.rst
index fb39b1d..642bfd4 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -257,7 +257,6 @@ Serialization / IO / Conversion
DataFrame.to_pandas
DataFrame.to_html
DataFrame.to_numpy
- DataFrame.to_pandas_on_spark
DataFrame.to_spark
DataFrame.to_string
DataFrame.to_json
diff --git a/python/docs/source/reference/pyspark.sql.rst
b/python/docs/source/reference/pyspark.sql.rst
index a9d4af8..dbdb060 100644
--- a/python/docs/source/reference/pyspark.sql.rst
+++ b/python/docs/source/reference/pyspark.sql.rst
@@ -218,6 +218,7 @@ DataFrame APIs
DataFrame.write
DataFrame.writeStream
DataFrame.writeTo
+ DataFrame.to_pandas_on_spark
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
diff --git a/python/pyspark/pandas/__init__.py
b/python/pyspark/pandas/__init__.py
index 12e54ad..eed9872 100644
--- a/python/pyspark/pandas/__init__.py
+++ b/python/pyspark/pandas/__init__.py
@@ -133,22 +133,6 @@ def _auto_patch_spark() -> None:
)
)
- # Autopatching is on by default.
- x = os.getenv("SPARK_KOALAS_AUTOPATCH", "true")
- if x.lower() in ("true", "1", "enabled"):
- logger = logging.getLogger("spark")
- logger.info(
- "Patching spark automatically. You can disable it by setting "
- "SPARK_KOALAS_AUTOPATCH=false in your environment"
- )
-
- from pyspark.sql import dataframe as df
-
- df.DataFrame.to_pandas_on_spark = DataFrame.to_pandas_on_spark #
type: ignore
-
- # Keep to_koalas for backward compatibility for now.
- df.DataFrame.to_koalas = DataFrame.to_koalas # type: ignore
-
_frame_has_class_getitem = False
_series_has_class_getitem = False
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index caee343..7f26346 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -4565,81 +4565,6 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
"""
return self.dot(other)
- def to_pandas_on_spark(self, index_col: Optional[Union[str, List[str]]] =
None) -> "DataFrame":
- """
- Converts the existing DataFrame into a pandas-on-Spark DataFrame.
-
- This method is monkey-patched into Spark's DataFrame and can be used
- to convert a Spark DataFrame into a pandas-on-Spark DataFrame. If
running on
- an existing pandas-on-Spark DataFrame, the method returns itself.
-
- If a pandas-on-Spark DataFrame is converted to a Spark DataFrame and
then back
- to pandas-on-Spark, it will lose the index information and the
original index
- will be turned into a normal column.
-
- Parameters
- ----------
- index_col: str or list of str, optional, default: None
- Index column of table in Spark.
-
- See Also
- --------
- DataFrame.to_spark
-
- Examples
- --------
- >>> df = ps.DataFrame({'col1': [1, 2], 'col2': [3, 4]},
columns=['col1', 'col2'])
- >>> df
- col1 col2
- 0 1 3
- 1 2 4
-
- >>> spark_df = df.to_spark()
- >>> spark_df
- DataFrame[col1: bigint, col2: bigint]
-
- >>> psdf = spark_df.to_pandas_on_spark()
- >>> psdf
- col1 col2
- 0 1 3
- 1 2 4
-
- We can specify the index columns.
-
- >>> psdf = spark_df.to_pandas_on_spark(index_col='col1')
- >>> psdf # doctest: +NORMALIZE_WHITESPACE
- col2
- col1
- 1 3
- 2 4
-
- Calling to_pandas_on_spark on a pandas-on-Spark DataFrame simply
returns itself.
-
- >>> df.to_pandas_on_spark()
- col1 col2
- 0 1 3
- 1 2 4
- """
- if isinstance(self, DataFrame):
- return self
- else:
- assert isinstance(self, SparkDataFrame), type(self)
- from pyspark.pandas.namespace import _get_index_map
-
- index_spark_columns, index_names = _get_index_map(self, index_col)
- internal = InternalFrame(
- spark_frame=self, index_spark_columns=index_spark_columns,
index_names=index_names
- )
- return DataFrame(internal)
-
- # Keep to_koalas for backward compatibility for now.
- def to_koalas(self, index_col: Optional[Union[str, List[str]]] = None) ->
"DataFrame":
- warnings.warn(
- "DataFrame.to_koalas is deprecated. Use
DataFrame.to_pandas_on_spark instead.",
- FutureWarning,
- )
- return self.to_pandas_on_spark(index_col)
-
def to_table(
self,
name: str,
diff --git a/python/pyspark/pandas/plot/core.py
b/python/pyspark/pandas/plot/core.py
index cb412f1..72b8ac5 100644
--- a/python/pyspark/pandas/plot/core.py
+++ b/python/pyspark/pandas/plot/core.py
@@ -20,7 +20,7 @@ import importlib
import pandas as pd
import numpy as np
from pyspark.ml.feature import Bucketizer
-from pyspark.mllib.stat import KernelDensity
+from pyspark.mllib.stat import KernelDensity # type: ignore
from pyspark.sql import functions as F
from pandas.core.base import PandasObject
from pandas.core.dtypes.inference import is_integer
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a859696..8613788 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2695,6 +2695,69 @@ class DataFrame(PandasMapOpsMixin,
PandasConversionMixin):
"""
return DataFrameWriterV2(self, table)
+ def to_pandas_on_spark(self, index_col=None):
+ """
+ Converts the existing DataFrame into a pandas-on-Spark DataFrame.
+
+ If a pandas-on-Spark DataFrame is converted to a Spark DataFrame and
then back
+ to pandas-on-Spark, it will lose the index information and the
original index
+ will be turned into a normal column.
+
+ This is only available if Pandas is installed and available.
+
+ Parameters
+ ----------
+ index_col: str or list of str, optional, default: None
+ Index column of table in Spark.
+
+ See Also
+ --------
+ pyspark.pandas.frame.DataFrame.to_spark
+
+ Examples
+ --------
+ >>> df.show() # doctest: +SKIP
+ +----+----+
+ |Col1|Col2|
+ +----+----+
+ | a| 1|
+ | b| 2|
+ | c| 3|
+ +----+----+
+
+ >>> df.to_pandas_on_spark() # doctest: +SKIP
+ Col1 Col2
+ 0 a 1
+ 1 b 2
+ 2 c 3
+
+ We can specify the index columns.
+
+ >>> df.to_pandas_on_spark(index_col="Col1"): # doctest: +SKIP
+ Col2
+ Col1
+ a 1
+ b 2
+ c 3
+ """
+ from pyspark.pandas.namespace import _get_index_map
+ from pyspark.pandas.frame import DataFrame
+ from pyspark.pandas.internal import InternalFrame
+
+ index_spark_columns, index_names = _get_index_map(self, index_col)
+ internal = InternalFrame(
+ spark_frame=self, index_spark_columns=index_spark_columns,
index_names=index_names
+ )
+ return DataFrame(internal)
+
+ # Keep to_koalas for backward compatibility for now.
+ def to_koalas(self, index_col=None):
+ warnings.warn(
+ "DataFrame.to_koalas is deprecated. Use
DataFrame.to_pandas_on_spark instead.",
+ FutureWarning,
+ )
+ return self.to_pandas_on_spark(index_col)
+
def _to_scala_map(sc, jm):
"""
diff --git a/python/pyspark/sql/dataframe.pyi b/python/pyspark/sql/dataframe.pyi
index af1bac6..9e762bf 100644
--- a/python/pyspark/sql/dataframe.pyi
+++ b/python/pyspark/sql/dataframe.pyi
@@ -49,6 +49,7 @@ from pyspark.storagelevel import StorageLevel
from pyspark.sql.pandas.conversion import PandasConversionMixin
from pyspark.sql.pandas.map_ops import PandasMapOpsMixin
+from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
sql_ctx: SQLContext
@@ -267,6 +268,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
def semanticHash(self) -> int: ...
def inputFiles(self) -> List[str]: ...
def writeTo(self, table: str) -> DataFrameWriterV2: ...
+ def to_pandas_on_spark(self, index_col: Optional[Union[str, List[str]]] =
None) -> PandasOnSparkDataFrame: ...
class DataFrameNaFunctions:
df: DataFrame
diff --git a/python/pyspark/sql/tests/test_dataframe.py
b/python/pyspark/sql/tests/test_dataframe.py
index 74895c0..a17448d 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -855,6 +855,22 @@ class DataFrameTests(ReusedSQLTestCase):
with self.assertRaisesRegex(TypeError, "Parameter 'truncate=foo'"):
df.show(truncate='foo')
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ pandas_requirement_message or pyarrow_requirement_message) # type:
ignore
+ def test_to_pandas_on_spark(self):
+ import pandas as pd
+ from pandas.testing import assert_frame_equal
+
+ sdf = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)],
["Col1", "Col2"])
+ psdf_from_sdf = sdf.to_pandas_on_spark()
+ psdf_from_sdf_with_index = sdf.to_pandas_on_spark(index_col="Col1")
+ pdf = pd.DataFrame({"Col1": ["a", "b", "c"], "Col2": [1, 2, 3]})
+ pdf_with_index = pdf.set_index("Col1")
+
+ assert_frame_equal(pdf, psdf_from_sdf.to_pandas())
+ assert_frame_equal(pdf_with_index,
psdf_from_sdf_with_index.to_pandas())
+
class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
# These tests are separate because it uses
'spark.sql.queryExecutionListeners' which is
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]