Paul Zaczkieiwcz created SPARK-12076:
----------------------------------------
Summary: countDistinct behaves inconsistently
Key: SPARK-12076
URL: https://issues.apache.org/jira/browse/SPARK-12076
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 1.5.1
Reporter: Paul Zaczkieiwcz
Priority: Minor
Assume:
{code:java}
val slicePlayed:DataFrame = _
val joinKeys:DataFrame = _
{code}
Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}}
and all columns beginning with "join_" are from {{joinKeys}}. The following
queries can return different values for slice_count_distinct:
{code:java}
slicePlayed.join(
joinKeys,
(
$"join_session_id" === $"cdnt_session_id" &&
$"join_asset_id" === $"cdnt_asset_id" &&
$"join_euid" === $"cdnt_euid"
),
"inner"
).groupBy(
$"cdnt_session_id".as("slice_played_session_id"),
$"cdnt_asset_id".as("slice_played_asset_id"),
$"cdnt_euid".as("slice_played_euid")
).agg(
countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
count($"cdnt_slice_number").as("slice_count_total"),
min($"cdnt_slice_number").as("min_slice_number"),
max($"cdnt_slice_number").as("max_slice_number")
).show(false)
{code}
{code:java}
slicePlayed.join(
joinKeys,
(
$"join_session_id" === $"cdnt_session_id" &&
$"join_asset_id" === $"cdnt_asset_id" &&
$"join_euid" === $"cdnt_euid"
),
"inner"
).groupBy(
$"cdnt_session_id".as("slice_played_session_id"),
$"cdnt_asset_id".as("slice_played_asset_id"),
$"cdnt_euid".as("slice_played_euid")
).agg(
min($"cdnt_event_time").as("slice_start_time"),
min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
min($"cdnt_user_ip").as("slice_played_user_ip"),
min($"cdnt_user_agent").as("slice_played_user_agent"),
min($"cdnt_referer").as("slice_played_referer"),
max($"cdnt_event_time").as("slice_end_time"),
countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
count($"cdnt_slice_number").as("slice_count_total"),
min($"cdnt_slice_number").as("min_slice_number"),
max($"cdnt_slice_number").as("max_slice_number"),
min($"cdnt_is_live").as("is_live")
).show(false)
{code}
The +only+ difference between the two queries are that I'm adding more columns
to the {{agg}} method.
I can't reproduce by manually creating a dataFrame from
{{DataFrame.parallelize}}. The original sources of the dataFrames are parquet
files.
The explain plans for the two queries are slightly different.
{code}
== Physical Plan ==
TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13],
functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
TungstenProject
[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13],
[join_session_id#41,join_asset_id#42,join_euid#43]
TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13
ASC], false, 0
TungstenExchange
hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
ConvertToUnsafe
Scan
ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43
ASC], false, 0
TungstenExchange
hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
ConvertToUnsafe
Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
{code}
{code}
== Physical Plan ==
SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13],
functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734])
SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
ConvertToSafe
TungstenProject
[cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13],
[join_session_id#41,join_asset_id#42,join_euid#43]
TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13
ASC], false, 0
TungstenExchange
hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
ConvertToUnsafe
Scan
ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43
ASC], false, 0
TungstenExchange
hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
ConvertToUnsafe
Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
{code}
The biggest difference betwen the two plans is whether TungstenAggregate is
used or whether SortBasedAggregate+ConvertToSafe is used. The
SortBasedAggregate+ConvertToSafe method gives the inaccurate results. I've been
able to get around this issue by adding a {{sortBy}} call before the
{{groupBy}} clause, but it strikes me that this particular calculation
shouldn't change by adding a manual sort in an intermediate step.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]