Dieter De Paepe created SPARK-33150:
---------------------------------------
Summary: Groupby key may not be unique when using window
Key: SPARK-33150
URL: https://issues.apache.org/jira/browse/SPARK-33150
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 3.0.0, 2.3.3
Reporter: Dieter De Paepe
Due to the way spark converts dates to local times, it may end up losing
details that allow it to differentiate instants when those times fall in the
transition for daylight savings time. Setting the spark timezone to UTC does
not resolve the issue.
This issue is somewhat related to SPARK-32123, but seems independent enough to
consider this a separate issue.
A minimal example is below. I tested these on Spark 3.0.0 and 2.3.3 (I could
not get 2.4.x to work on my system). My machine is located in timezone
"Europe/Brussels".
{code:java}
import pyspark
import pyspark.sql.functions as f
spark = (pyspark
.sql
.SparkSession
.builder
.master('local[1]')
.config("spark.sql.session.timeZone", "UTC")
.config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \
.config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')
.getOrCreate()
)
debug_df = spark.createDataFrame([
(1572137640, 1),
(1572137640, 2),
(1572141240, 3),
(1572141240, 4)
],['epochtime', 'value'])
debug_df \
.withColumn('time', f.from_unixtime('epochtime')) \
.withColumn('window', f.window('time', '1 minute').start) \
.collect()
{code}
Output, here we see the window function internally transforms the times to
local time, and as such has to disambiguate between the Belgian winter and
summer hour transition by setting the "fold" attribute:
{code:java}
[Row(epochtime=1572137640, value=1, time='2019-10-27 00:54:00',
window=datetime.datetime(2019, 10, 27, 2, 54)),
Row(epochtime=1572137640, value=2, time='2019-10-27 00:54:00',
window=datetime.datetime(2019, 10, 27, 2, 54)),
Row(epochtime=1572141240, value=3, time='2019-10-27 01:54:00',
window=datetime.datetime(2019, 10, 27, 2, 54, fold=1)),
Row(epochtime=1572141240, value=4, time='2019-10-27 01:54:00',
window=datetime.datetime(2019, 10, 27, 2, 54, fold=1))]{code}
Now, this has severe implications when we use the window function for a groupby
operation:
{code:java}
output = debug_df \
.withColumn('time', f.from_unixtime('epochtime')) \
.groupby(f.window('time', '1 minute').start.alias('window')).agg(
f.min('value').alias('min_value')
)
output_collect = output.collect()
output_pandas = output.toPandas()
print(output_collect)
print(output_pandas)
{code}
Output:
{code:java}
[Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1),
Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)]
window min_value
0 2019-10-27 00:54:00 1
1 2019-10-27 00:54:00 3
{code}
While the output using collect() outputs Belgian local time, it allows us to
differentiate between the two different keys visually using the fold attribute.
However, due to the way the fold attribute is defined, [it is ignored
for|https://www.python.org/dev/peps/pep-0495/#the-fold-attribute] equality
comparison.
On the other hand, the pandas output uses the UTC output (due to the setting of
spark.sql.session.timeZone), but it has lost the disambiguating fold attribute
in the pandas datatype conversion.
In both cases, the column on which was grouped is not unique.
{code:java}
print(output_collect[0].window == output_collect[1].window) # True
print(output_collect[0].window.fold == output_collect[1].window.fold) # False
print(output_pandas.window[0] == output_pandas.window[1]) # True
print(output_pandas.window[0].fold == output_pandas.window[1].fold) # True
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]