[
https://issues.apache.org/jira/browse/SPARK-43611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Haejoon Lee updated SPARK-43611:
--------------------------------
Description:
Reproducible example:
{code:java}
>>> import pyspark.pandas as ps
>>> psdf1 = ps.DataFrame({"A": [1, 2, 3]})
>>> psdf2 = ps.DataFrame({"B": [1, 2, 3]})
>>> psdf1.append(psdf2)
/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py:8897:
FutureWarning: The DataFrame.append method is deprecated and will be removed in
a future version. Use pyspark.pandas.concat instead.
warnings.warn(
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
line 8930, in append
return cast(DataFrame, concat([self, other], ignore_index=ignore_index))
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/namespace.py",
line 2703, in concat
psdfs[0]._internal.copy(
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
line 1508, in copy
return InternalFrame(
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
line 753, in __init__
schema = spark_frame.select(data_spark_columns).schema
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/dataframe.py",
line 1650, in schema
return self._session.client.schema(query)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 777, in schema
schema = self._analyze(method="schema", plan=plan).schema
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 958, in _analyze
self._handle_error(error)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 1195, in _handle_error
self._handle_rpc_error(error)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 1231, in _handle_rpc_error
raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.AnalysisException: When resolving 'A, fail to
find subplan with plan_id=16 in 'Project ['A, 'B]
+- Project [__index_level_0__#1101L, A#1102L, B#1157L,
monotonically_increasing_id() AS __natural_order__#1163L]
+- Union false, false
:- Project [__index_level_0__#1101L, A#1102L, cast(B#1116 as bigint) AS
B#1157L]
: +- Project [__index_level_0__#1101L, A#1102L, B#1116]
: +- Project [__index_level_0__#1101L, A#1102L,
__natural_order__#1108L, null AS B#1116]
: +- Project [__index_level_0__#1101L, A#1102L,
__natural_order__#1108L]
: +- Project [__index_level_0__#1101L, A#1102L,
monotonically_increasing_id() AS __natural_order__#1108L]
: +- Project [__index_level_0__#1097L AS
__index_level_0__#1101L, A#1098L AS A#1102L]
: +- LocalRelation [__index_level_0__#1097L, A#1098L]
+- Project [__index_level_0__#1137L, cast(A#1152 as bigint) AS A#1158L,
B#1138L]
+- Project [__index_level_0__#1137L, A#1152, B#1138L]
+- Project [__index_level_0__#1137L, B#1138L,
__natural_order__#1144L, null AS A#1152]
+- Project [__index_level_0__#1137L, B#1138L,
__natural_order__#1144L]
+- Project [__index_level_0__#1137L, B#1138L,
monotonically_increasing_id() AS __natural_order__#1144L]
+- Project [__index_level_0__#1133L AS
__index_level_0__#1137L, B#1134L AS B#1138L]
+- LocalRelation [__index_level_0__#1133L, B#1134L]
{code}
Another example:
{code:java}
>>> pdf = pd.DataFrame(
... {
... "A": [None, 3, None, None],
... "B": [2, 4, None, 3],
... "C": [None, None, None, 1],
... "D": [0, 1, 5, 4],
... },
... columns=["A", "B", "C", "D"],
... )
>>> psdf = ps.from_pandas(pdf)
>>> psdf.backfill()
/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/expressions.py:945:
UserWarning: WARN WindowExpression: No Partition Defined for Window operation!
Moving all data to a single partition, this can cause serious performance
degradation.
warnings.warn(
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
line 13391, in __repr__
pdf = cast("DataFrame",
self._get_or_create_repr_pandas_cache(max_display_count))
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
line 13382, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
line 6354, in head
sdf = self._internal.resolved_copy.spark_frame
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/utils.py",
line 605, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
line 1255, in resolved_copy
return self.copy(
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
line 1508, in copy
return InternalFrame(
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
line 634, in __init__
assert not spark_frame.isStreaming, "pandas-on-Spark does not support
Structured Streaming."
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/dataframe.py",
line 1671, in isStreaming
result = self._session.client._analyze(method="is_streaming",
plan=query).is_streaming
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 958, in _analyze
self._handle_error(error)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 1195, in _handle_error
self._handle_rpc_error(error)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 1231, in _handle_rpc_error
raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.AnalysisException: When resolving
'__index_level_0__, fail to find subplan with plan_id=14 in 'Project
['__index_level_0__, 'A, 'B, 'C, 'D, '__natural_order__]
+- Project [__index_level_0__#13701L, A#13725, B#13727, C#13729, D#13705L,
__natural_order__#13717L]
+- Project [__index_level_0__#13701L, D#13705L, __natural_order__#13717L,
A#13702, B#13703, C#13704, _we0#13733, _we1#13734, _we2#13735, CASE WHEN
(isnull(A#13702) OR isnan(A#13702)) AS A#13724 THEN _we0#13733 ELSE A#13702 END
AS A#13725, CASE WHEN (isnull(B#13703) OR isnan(B#13703)) AS B#13726 THEN
_we1#13734 ELSE B#13703 END AS B#13727, CASE WHEN (isnull(C#13704) OR
isnan(C#13704)) AS C#13728 THEN _we2#13735 ELSE C#13704 END AS C#13729]
+- Window [first(A#13702, true)
windowspecdefinition(__natural_order__#13717L ASC NULLS FIRST,
specifiedwindowframe(RowFrame, 1, unboundedfollowing$())) AS _we0#13733,
first(B#13703, true) windowspecdefinition(__natural_order__#13717L ASC NULLS
FIRST, specifiedwindowframe(RowFrame, 1, unboundedfollowing$())) AS _we1#13734,
first(C#13704, true) windowspecdefinition(__natural_order__#13717L ASC NULLS
FIRST, specifiedwindowframe(RowFrame, 1, unboundedfollowing$())) AS
_we2#13735], [__natural_order__#13717L ASC NULLS FIRST]
+- Project [__index_level_0__#13701L, D#13705L,
__natural_order__#13717L, A#13702, B#13703, C#13704]
+- Project [__index_level_0__#13701L, A#13702, B#13703, C#13704,
D#13705L, monotonically_increasing_id() AS __natural_order__#13717L]
+- Project [__index_level_0__#13691L AS
__index_level_0__#13701L, A#13692 AS A#13702, B#13693 AS B#13703, C#13694 AS
C#13704, D#13695L AS D#13705L]
+- LocalRelation [__index_level_0__#13691L, A#13692, B#13693,
C#13694, D#13695L] {code}
was:
Reproducible example:
{code:java}
>>> import pyspark.pandas as ps
>>> psdf1 = ps.DataFrame({"A": [1, 2, 3]})
>>> psdf2 = ps.DataFrame({"B": [1, 2, 3]})
>>> psdf1.append(psdf2)
/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py:8897:
FutureWarning: The DataFrame.append method is deprecated and will be removed in
a future version. Use pyspark.pandas.concat instead.
warnings.warn(
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
line 8930, in append
return cast(DataFrame, concat([self, other], ignore_index=ignore_index))
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/namespace.py",
line 2703, in concat
psdfs[0]._internal.copy(
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
line 1508, in copy
return InternalFrame(
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
line 753, in __init__
schema = spark_frame.select(data_spark_columns).schema
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/dataframe.py",
line 1650, in schema
return self._session.client.schema(query)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 777, in schema
schema = self._analyze(method="schema", plan=plan).schema
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 958, in _analyze
self._handle_error(error)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 1195, in _handle_error
self._handle_rpc_error(error)
File
"/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
line 1231, in _handle_rpc_error
raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.AnalysisException: When resolving 'A, fail to
find subplan with plan_id=16 in 'Project ['A, 'B]
+- Project [__index_level_0__#1101L, A#1102L, B#1157L,
monotonically_increasing_id() AS __natural_order__#1163L]
+- Union false, false
:- Project [__index_level_0__#1101L, A#1102L, cast(B#1116 as bigint) AS
B#1157L]
: +- Project [__index_level_0__#1101L, A#1102L, B#1116]
: +- Project [__index_level_0__#1101L, A#1102L,
__natural_order__#1108L, null AS B#1116]
: +- Project [__index_level_0__#1101L, A#1102L,
__natural_order__#1108L]
: +- Project [__index_level_0__#1101L, A#1102L,
monotonically_increasing_id() AS __natural_order__#1108L]
: +- Project [__index_level_0__#1097L AS
__index_level_0__#1101L, A#1098L AS A#1102L]
: +- LocalRelation [__index_level_0__#1097L, A#1098L]
+- Project [__index_level_0__#1137L, cast(A#1152 as bigint) AS A#1158L,
B#1138L]
+- Project [__index_level_0__#1137L, A#1152, B#1138L]
+- Project [__index_level_0__#1137L, B#1138L,
__natural_order__#1144L, null AS A#1152]
+- Project [__index_level_0__#1137L, B#1138L,
__natural_order__#1144L]
+- Project [__index_level_0__#1137L, B#1138L,
monotonically_increasing_id() AS __natural_order__#1144L]
+- Project [__index_level_0__#1133L AS
__index_level_0__#1137L, B#1134L AS B#1138L]
+- LocalRelation [__index_level_0__#1133L, B#1134L]
{code}
> Fix unexpected `AnalysisException` from Spark Connect client
> ------------------------------------------------------------
>
> Key: SPARK-43611
> URL: https://issues.apache.org/jira/browse/SPARK-43611
> Project: Spark
> Issue Type: Sub-task
> Components: Connect, Pandas API on Spark
> Affects Versions: 3.5.0
> Reporter: Haejoon Lee
> Priority: Major
>
> Reproducible example:
> {code:java}
> >>> import pyspark.pandas as ps
> >>> psdf1 = ps.DataFrame({"A": [1, 2, 3]})
> >>> psdf2 = ps.DataFrame({"B": [1, 2, 3]})
> >>> psdf1.append(psdf2)
> /Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py:8897:
> FutureWarning: The DataFrame.append method is deprecated and will be removed
> in a future version. Use pyspark.pandas.concat instead.
> warnings.warn(
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
> line 8930, in append
> return cast(DataFrame, concat([self, other], ignore_index=ignore_index))
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/namespace.py",
> line 2703, in concat
> psdfs[0]._internal.copy(
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
> line 1508, in copy
> return InternalFrame(
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
> line 753, in __init__
> schema = spark_frame.select(data_spark_columns).schema
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/dataframe.py",
> line 1650, in schema
> return self._session.client.schema(query)
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
> line 777, in schema
> schema = self._analyze(method="schema", plan=plan).schema
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
> line 958, in _analyze
> self._handle_error(error)
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
> line 1195, in _handle_error
> self._handle_rpc_error(error)
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
> line 1231, in _handle_rpc_error
> raise convert_exception(info, status.message) from None
> pyspark.errors.exceptions.connect.AnalysisException: When resolving 'A, fail
> to find subplan with plan_id=16 in 'Project ['A, 'B]
> +- Project [__index_level_0__#1101L, A#1102L, B#1157L,
> monotonically_increasing_id() AS __natural_order__#1163L]
> +- Union false, false
> :- Project [__index_level_0__#1101L, A#1102L, cast(B#1116 as bigint) AS
> B#1157L]
> : +- Project [__index_level_0__#1101L, A#1102L, B#1116]
> : +- Project [__index_level_0__#1101L, A#1102L,
> __natural_order__#1108L, null AS B#1116]
> : +- Project [__index_level_0__#1101L, A#1102L,
> __natural_order__#1108L]
> : +- Project [__index_level_0__#1101L, A#1102L,
> monotonically_increasing_id() AS __natural_order__#1108L]
> : +- Project [__index_level_0__#1097L AS
> __index_level_0__#1101L, A#1098L AS A#1102L]
> : +- LocalRelation [__index_level_0__#1097L, A#1098L]
> +- Project [__index_level_0__#1137L, cast(A#1152 as bigint) AS A#1158L,
> B#1138L]
> +- Project [__index_level_0__#1137L, A#1152, B#1138L]
> +- Project [__index_level_0__#1137L, B#1138L,
> __natural_order__#1144L, null AS A#1152]
> +- Project [__index_level_0__#1137L, B#1138L,
> __natural_order__#1144L]
> +- Project [__index_level_0__#1137L, B#1138L,
> monotonically_increasing_id() AS __natural_order__#1144L]
> +- Project [__index_level_0__#1133L AS
> __index_level_0__#1137L, B#1134L AS B#1138L]
> +- LocalRelation [__index_level_0__#1133L, B#1134L]
> {code}
> Another example:
> {code:java}
> >>> pdf = pd.DataFrame(
> ... {
> ... "A": [None, 3, None, None],
> ... "B": [2, 4, None, 3],
> ... "C": [None, None, None, 1],
> ... "D": [0, 1, 5, 4],
> ... },
> ... columns=["A", "B", "C", "D"],
> ... )
> >>> psdf = ps.from_pandas(pdf)
> >>> psdf.backfill()
> /Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/expressions.py:945:
> UserWarning: WARN WindowExpression: No Partition Defined for Window
> operation! Moving all data to a single partition, this can cause serious
> performance degradation.
> warnings.warn(
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
> line 13391, in __repr__
> pdf = cast("DataFrame",
> self._get_or_create_repr_pandas_cache(max_display_count))
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
> line 13382, in _get_or_create_repr_pandas_cache
> self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/frame.py",
> line 6354, in head
> sdf = self._internal.resolved_copy.spark_frame
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/utils.py",
> line 605, in wrapped_lazy_property
> setattr(self, attr_name, fn(self))
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
> line 1255, in resolved_copy
> return self.copy(
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
> line 1508, in copy
> return InternalFrame(
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/internal.py",
> line 634, in __init__
> assert not spark_frame.isStreaming, "pandas-on-Spark does not support
> Structured Streaming."
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/dataframe.py",
> line 1671, in isStreaming
> result = self._session.client._analyze(method="is_streaming",
> plan=query).is_streaming
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
> line 958, in _analyze
> self._handle_error(error)
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
> line 1195, in _handle_error
> self._handle_rpc_error(error)
> File
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/connect/client.py",
> line 1231, in _handle_rpc_error
> raise convert_exception(info, status.message) from None
> pyspark.errors.exceptions.connect.AnalysisException: When resolving
> '__index_level_0__, fail to find subplan with plan_id=14 in 'Project
> ['__index_level_0__, 'A, 'B, 'C, 'D, '__natural_order__]
> +- Project [__index_level_0__#13701L, A#13725, B#13727, C#13729, D#13705L,
> __natural_order__#13717L]
> +- Project [__index_level_0__#13701L, D#13705L, __natural_order__#13717L,
> A#13702, B#13703, C#13704, _we0#13733, _we1#13734, _we2#13735, CASE WHEN
> (isnull(A#13702) OR isnan(A#13702)) AS A#13724 THEN _we0#13733 ELSE A#13702
> END AS A#13725, CASE WHEN (isnull(B#13703) OR isnan(B#13703)) AS B#13726 THEN
> _we1#13734 ELSE B#13703 END AS B#13727, CASE WHEN (isnull(C#13704) OR
> isnan(C#13704)) AS C#13728 THEN _we2#13735 ELSE C#13704 END AS C#13729]
> +- Window [first(A#13702, true)
> windowspecdefinition(__natural_order__#13717L ASC NULLS FIRST,
> specifiedwindowframe(RowFrame, 1, unboundedfollowing$())) AS _we0#13733,
> first(B#13703, true) windowspecdefinition(__natural_order__#13717L ASC NULLS
> FIRST, specifiedwindowframe(RowFrame, 1, unboundedfollowing$())) AS
> _we1#13734, first(C#13704, true)
> windowspecdefinition(__natural_order__#13717L ASC NULLS FIRST,
> specifiedwindowframe(RowFrame, 1, unboundedfollowing$())) AS _we2#13735],
> [__natural_order__#13717L ASC NULLS FIRST]
> +- Project [__index_level_0__#13701L, D#13705L,
> __natural_order__#13717L, A#13702, B#13703, C#13704]
> +- Project [__index_level_0__#13701L, A#13702, B#13703, C#13704,
> D#13705L, monotonically_increasing_id() AS __natural_order__#13717L]
> +- Project [__index_level_0__#13691L AS
> __index_level_0__#13701L, A#13692 AS A#13702, B#13693 AS B#13703, C#13694 AS
> C#13704, D#13695L AS D#13705L]
> +- LocalRelation [__index_level_0__#13691L, A#13692,
> B#13693, C#13694, D#13695L] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]