Paul Zaczkieiwcz created SPARK-12076:
----------------------------------------

             Summary: countDistinct behaves inconsistently
                 Key: SPARK-12076
                 URL: https://issues.apache.org/jira/browse/SPARK-12076
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.5.1
            Reporter: Paul Zaczkieiwcz
            Priority: Minor


Assume:
{code:java}
val slicePlayed:DataFrame = _
val joinKeys:DataFrame = _
{code}

Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} 
and all columns beginning with "join_" are from {{joinKeys}}.  The following 
queries can return different values for slice_count_distinct:

{code:java}
slicePlayed.join(
  joinKeys,
  ( 
    $"join_session_id" === $"cdnt_session_id" &&
    $"join_asset_id" === $"cdnt_asset_id" &&
    $"join_euid" === $"cdnt_euid"
  ),
  "inner"
).groupBy(
  $"cdnt_session_id".as("slice_played_session_id"),
  $"cdnt_asset_id".as("slice_played_asset_id"),
  $"cdnt_euid".as("slice_played_euid")
).agg(
  countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
  count($"cdnt_slice_number").as("slice_count_total"),
  min($"cdnt_slice_number").as("min_slice_number"),
  max($"cdnt_slice_number").as("max_slice_number")
).show(false)
{code}
{code:java}
slicePlayed.join(
  joinKeys,
  ( 
    $"join_session_id" === $"cdnt_session_id" &&
    $"join_asset_id" === $"cdnt_asset_id" &&
    $"join_euid" === $"cdnt_euid"
  ),
  "inner"
).groupBy(
  $"cdnt_session_id".as("slice_played_session_id"),
  $"cdnt_asset_id".as("slice_played_asset_id"),
  $"cdnt_euid".as("slice_played_euid")
).agg(
  min($"cdnt_event_time").as("slice_start_time"),
  min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
  min($"cdnt_user_ip").as("slice_played_user_ip"),
  min($"cdnt_user_agent").as("slice_played_user_agent"),
  min($"cdnt_referer").as("slice_played_referer"),
  max($"cdnt_event_time").as("slice_end_time"),
  countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
  count($"cdnt_slice_number").as("slice_count_total"),
  min($"cdnt_slice_number").as("min_slice_number"),
  max($"cdnt_slice_number").as("max_slice_number"),
  min($"cdnt_is_live").as("is_live")
).show(false)
{code}

The +only+ difference between the two queries are that I'm adding more columns 
to the {{agg}} method.

I can't reproduce by manually creating a dataFrame from 
{{DataFrame.parallelize}}. The original sources of the dataFrames are parquet 
files.

The explain plans for the two queries are slightly different.
{code}
== Physical Plan ==
TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
 
output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
 
TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
 
functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)],
 
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
  
TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
 
functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)],
 
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
   TungstenProject 
[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
    SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
[join_session_id#41,join_asset_id#42,join_euid#43]
     TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 
ASC], false, 0
      TungstenExchange 
hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
       ConvertToUnsafe
        Scan 
ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
     TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 
ASC], false, 0
      TungstenExchange 
hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
       ConvertToUnsafe
        Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
{code}
{code}
== Physical Plan ==
SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
 
output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734])
 
SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
 
functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)],
 
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
  
SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
 
functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)],
 
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
   ConvertToSafe
    TungstenProject 
[cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
     SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
[join_session_id#41,join_asset_id#42,join_euid#43]
      TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 
ASC], false, 0
       TungstenExchange 
hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
        ConvertToUnsafe
         Scan 
ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
      TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 
ASC], false, 0
       TungstenExchange 
hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
        ConvertToUnsafe
         Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
{code}
The biggest difference betwen the two plans is whether TungstenAggregate is 
used or whether SortBasedAggregate+ConvertToSafe is used. The 
SortBasedAggregate+ConvertToSafe method gives the inaccurate results. I've been 
able to get around this issue by adding a {{sortBy}} call before the 
{{groupBy}} clause, but it strikes me that this particular calculation 
shouldn't change by adding a manual sort in an intermediate step.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to