Dieter De Paepe created SPARK-33150:
---------------------------------------

             Summary: Groupby key may not be unique when using window
                 Key: SPARK-33150
                 URL: https://issues.apache.org/jira/browse/SPARK-33150
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.0.0, 2.3.3
            Reporter: Dieter De Paepe


 

Due to the way spark converts dates to local times, it may end up losing 
details that allow it to differentiate instants when those times fall in the 
transition for daylight savings time. Setting the spark timezone to UTC does 
not resolve the issue.

This issue is somewhat related to SPARK-32123, but seems independent enough to 
consider this a separate issue.

A minimal example is below. I tested these on Spark 3.0.0 and 2.3.3 (I could 
not get 2.4.x to work on my system). My machine is located in timezone 
"Europe/Brussels".

 
{code:java}
import pyspark
import pyspark.sql.functions as f
spark = (pyspark
 .sql
 .SparkSession
 .builder
 .master('local[1]')
 .config("spark.sql.session.timeZone", "UTC")
 .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \
 .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')
 .getOrCreate()
)
debug_df = spark.createDataFrame([
 (1572137640, 1),
 (1572137640, 2),
 (1572141240, 3),
 (1572141240, 4)
],['epochtime', 'value'])

debug_df \
 .withColumn('time', f.from_unixtime('epochtime')) \
 .withColumn('window', f.window('time', '1 minute').start) \
 .collect()
{code}
 

Output, here we see the window function internally transforms the times to 
local time, and as such has to disambiguate between the Belgian winter and 
summer hour transition by setting the "fold" attribute:

 
{code:java}
[Row(epochtime=1572137640, value=1, time='2019-10-27 00:54:00', 
window=datetime.datetime(2019, 10, 27, 2, 54)),
 Row(epochtime=1572137640, value=2, time='2019-10-27 00:54:00', 
window=datetime.datetime(2019, 10, 27, 2, 54)),
 Row(epochtime=1572141240, value=3, time='2019-10-27 01:54:00', 
window=datetime.datetime(2019, 10, 27, 2, 54, fold=1)),
 Row(epochtime=1572141240, value=4, time='2019-10-27 01:54:00', 
window=datetime.datetime(2019, 10, 27, 2, 54, fold=1))]{code}
 

Now, this has severe implications when we use the window function for a groupby 
operation:

 
{code:java}
output = debug_df \
 .withColumn('time', f.from_unixtime('epochtime')) \
 .groupby(f.window('time', '1 minute').start.alias('window')).agg(
   f.min('value').alias('min_value')
 )
output_collect = output.collect()
output_pandas = output.toPandas()
print(output_collect)
print(output_pandas)
{code}
Output:

 
{code:java}
[Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), 
Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)]

  window              min_value
0 2019-10-27 00:54:00 1
1 2019-10-27 00:54:00 3
{code}
 

While the output using collect() outputs Belgian local time, it allows us to 
differentiate between the two different keys visually using the fold attribute. 
However, due to the way the fold attribute is defined, [it is ignored 
for|https://www.python.org/dev/peps/pep-0495/#the-fold-attribute] equality 
comparison.

On the other hand, the pandas output uses the UTC output (due to the setting of 
spark.sql.session.timeZone), but it has lost the disambiguating fold attribute 
in the pandas datatype conversion.

In both cases, the column on which was grouped is not unique.

 
{code:java}
print(output_collect[0].window == output_collect[1].window)  # True
print(output_collect[0].window.fold == output_collect[1].window.fold)  # False
print(output_pandas.window[0] == output_pandas.window[1])  # True
print(output_pandas.window[0].fold == output_pandas.window[1].fold)  # True
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to