[
https://issues.apache.org/jira/browse/SPARK-42397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ted Chester Jenks updated SPARK-42397:
--------------------------------------
Description:
We are seeing inconsistent data returned when using `FlatMapCoGroupsInPandas`.
In the PySpark example:
{{ test_df = spark.createDataFrame(}}
{{ [}}
{{ ["1", "23", "abc", "blah", "def", "1"],}}
{{ ["1", "23", "abc", "blah", "def", "1"],}}
{{ ["1", "23", "abc", "blah", "def", "2"],}}
{{ ["1", "23", "abc", "blah", "def", "2"],}}
{{ ],}}
{{ ["cluster", "partition", "event", "abc", "def", "one_or_two"]}}
{{ )}}
{{ df1 = test_df.filter(}}
{{ F.col("one_or_two") == "1"}}
{{ ).select(}}
{{ "cluster", "event", "abc"}}
{{ )}}{{ df2 = test_df.filter(}}
{{ F.col("one_or_two") == "2"}}
{{ ).select(}}
{{ "cluster", "event", "def"}}
{{ )}}
{{ def get_schema(l, r):}}
{{ return pd.DataFrame(}}
{{ [(str(l.columns), str(r.columns))],}}
{{ columns=["left_colms", "right_colms"]}}
{{ )}}{{ grouped_df =
df1.groupBy("cluster").cogroup(df2.groupBy("cluster")).applyInPandas(}}
{{ get_schema, "left_colms string, right_colms string"}}
{{ )}}
{{ grouped_df_1 = grouped_df.withColumn(}}
{{ "xyz", F.lit("1234")}}
{{ )}}
When we call `grouped_df.collect()` we get:
{{[Row(left_colms="Index(['cluster', 'event', 'abc'], dtype='object')",
right_colms="Index(['cluster', 'event', 'def'], dtype='object')")] }}
When we call `grouped_df.show(5, truncate=False)` we get:
{{{}{+}----------------------------------------{-}{-}{+}-------------------------------------------------+{}}}{{{}{}}}
|left_colms |right_colms
|
{{{}{}}}{{{}{+}----------------------------------------{-}{-}{+}-------------------------------------------------+{}}}{{{}{}}}
|Index(['cluster', 'abc'], dtype='object')|Index(['cluster', 'event', 'def'],
dtype='object')|
{{{}{}}}{{{}{+}----------------------------------------{-}{-}{+}-------------------------------------------------+{}}}
When we call `grouped_df_1.collect()` we get:
{{[Row(left_colms="Index(['cluster', 'abc'], dtype='object')",
right_colms="Index(['cluster', 'event', 'def'], dtype='object')", xyz='1234')]
}}
was:
We are seeing inconsistent data returned when using `FlatMapCoGroupsInPandas`.
In the PySpark example:
```
test_df = spark.createDataFrame(
[
["1", "23", "abc", "blah", "def", "1"],
["1", "23", "abc", "blah", "def", "1"],
["1", "23", "abc", "blah", "def", "2"],
["1", "23", "abc", "blah", "def", "2"],
],
["cluster", "partition", "event", "abc", "def", "one_or_two"]
)
df1 = test_df.filter(
F.col("one_or_two") == "1"
).select(
"cluster", "event", "abc"
)
df2 = test_df.filter(
F.col("one_or_two") == "2"
).select(
"cluster", "event", "def"
)
def get_schema(l, r):
return pd.DataFrame(
[(str(l.columns), str(r.columns))],
columns=["left_colms", "right_colms"]
)
grouped_df =
df1.groupBy("cluster").cogroup(df2.groupBy("cluster")).applyInPandas(
get_schema, "left_colms string, right_colms string"
)
grouped_df_1 = grouped_df.withColumn(
"xyz", F.lit("1234")
)
```
When we call `grouped_df.collect()` we get:
```
[Row(left_colms="Index(['cluster', 'event', 'abc'], dtype='object')",
right_colms="Index(['cluster', 'event', 'def'], dtype='object')")]
```
When we call `grouped_df.show(5, truncate=False)` we get:
```
+-----------------------------------------+--------------------------------------------------+
|left_colms |right_colms
|
+-----------------------------------------+--------------------------------------------------+
|Index(['cluster', 'abc'], dtype='object')|Index(['cluster', 'event', 'def'],
dtype='object')|
+-----------------------------------------+--------------------------------------------------+
```
When we call `grouped_df_1.collect()` we get:
```
[Row(left_colms="Index(['cluster', 'abc'], dtype='object')",
right_colms="Index(['cluster', 'event', 'def'], dtype='object')", xyz='1234')]
```
> Inconsistent data produced by `FlatMapCoGroupsInPandas`
> -------------------------------------------------------
>
> Key: SPARK-42397
> URL: https://issues.apache.org/jira/browse/SPARK-42397
> Project: Spark
> Issue Type: Bug
> Components: Pandas API on Spark, SQL
> Affects Versions: 3.3.0, 3.3.1
> Reporter: Ted Chester Jenks
> Priority: Minor
>
> We are seeing inconsistent data returned when using
> `FlatMapCoGroupsInPandas`. In the PySpark example:
> {{ test_df = spark.createDataFrame(}}
> {{ [}}
> {{ ["1", "23", "abc", "blah", "def", "1"],}}
> {{ ["1", "23", "abc", "blah", "def", "1"],}}
> {{ ["1", "23", "abc", "blah", "def", "2"],}}
> {{ ["1", "23", "abc", "blah", "def", "2"],}}
> {{ ],}}
> {{ ["cluster", "partition", "event", "abc", "def", "one_or_two"]}}
> {{ )}}
> {{ df1 = test_df.filter(}}
> {{ F.col("one_or_two") == "1"}}
> {{ ).select(}}
> {{ "cluster", "event", "abc"}}
> {{ )}}{{ df2 = test_df.filter(}}
> {{ F.col("one_or_two") == "2"}}
> {{ ).select(}}
> {{ "cluster", "event", "def"}}
> {{ )}}
> {{ def get_schema(l, r):}}
> {{ return pd.DataFrame(}}
> {{ [(str(l.columns), str(r.columns))],}}
> {{ columns=["left_colms", "right_colms"]}}
> {{ )}}{{ grouped_df =
> df1.groupBy("cluster").cogroup(df2.groupBy("cluster")).applyInPandas(}}
> {{ get_schema, "left_colms string, right_colms string"}}
> {{ )}}
> {{ grouped_df_1 = grouped_df.withColumn(}}
> {{ "xyz", F.lit("1234")}}
> {{ )}}
> When we call `grouped_df.collect()` we get:
>
> {{[Row(left_colms="Index(['cluster', 'event', 'abc'], dtype='object')",
> right_colms="Index(['cluster', 'event', 'def'], dtype='object')")] }}
>
> When we call `grouped_df.show(5, truncate=False)` we get:
>
> {{{}{+}----------------------------------------{-}{-}{+}-------------------------------------------------+{}}}{{{}{}}}
> |left_colms |right_colms
> |
> {{{}{}}}{{{}{+}----------------------------------------{-}{-}{+}-------------------------------------------------+{}}}{{{}{}}}
> |Index(['cluster', 'abc'], dtype='object')|Index(['cluster', 'event', 'def'],
> dtype='object')|
> {{{}{}}}{{{}{+}----------------------------------------{-}{-}{+}-------------------------------------------------+{}}}
>
> When we call `grouped_df_1.collect()` we get:
>
> {{[Row(left_colms="Index(['cluster', 'abc'], dtype='object')",
> right_colms="Index(['cluster', 'event', 'def'], dtype='object')",
> xyz='1234')] }}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]