ShreyeshArangath opened a new issue, #1840:
URL: https://github.com/apache/auron/issues/1840

   **Describe the bug**
   Spark’s collect_set keeps the order of first occurrence when there’s _no 
shuffle_, and the upstream DataFrameAggregateSuite depends on that behavior. 
Auron’s native collect aggregations are emitting the same unique values but in 
whatever order the native accumulator happens to flush, so the shared Spark 
test collect functions structs fails even though both sides contain the same 
elements.
   
   **To Reproduce**
   <!--
   Steps to reproduce the behavior:
   1. Go to '...'
   2. Click on '....'
   3. Scroll down to '....'
   4. See error
   -->
   
   1. Comment out ` .excludeByPrefix("collect functions")` in 
AuronSparkSettings.scala
   2. Run AuronDataFrameAggregateSuite under auron-spark-tests/spark33.
   Observe the failure in test("collect functions structs"), which performs:
   ```scala
   df.select(collect_set($"a"), sort_array(collect_set($"b")))
   
   Correct Spark answer: [Seq(1, 2, 3), Seq(Row(2, 2), Row(4, 1))]
   Native answer: [ArrayBuffer(2, 3, 1), ArrayBuffer(Row(2, 2), Row(4, 1))]
   ```
   
   **Expected behavior**
   <!--
   A clear and concise description of what you expected to happen.
   -->
   Match Spark's behavior 
   
   **Screenshots**
   <!--
   If applicable, add screenshots to help explain your problem.
   -->
   
   Full Logs
   
   ```
   - collect functions *** FAILED ***
     Results do not match for query:
     Timezone: 
sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
     Timezone Env: 
     
     == Parsed Logical Plan ==
     'Project [unresolvedalias(collect_set('a, 0, 0), 
Some(org.apache.spark.sql.Column$$Lambda$4163/0x0000000801973840@15a95b85)), 
unresolvedalias(collect_set('b, 0, 0), 
Some(org.apache.spark.sql.Column$$Lambda$4163/0x0000000801973840@15a95b85))]
     +- Project [_1#42392 AS a#42397, _2#42393 AS b#42398]
        +- LocalRelation [_1#42392, _2#42393]
     
     == Analyzed Logical Plan ==
     collect_set(a): array<int>, collect_set(b): array<int>
     Aggregate [collect_set(a#42397, 0, 0) AS collect_set(a)#42541, 
collect_set(b#42398, 0, 0) AS collect_set(b)#42542]
     +- Project [_1#42392 AS a#42397, _2#42393 AS b#42398]
        +- LocalRelation [_1#42392, _2#42393]
     
     == Optimized Logical Plan ==
     Aggregate [collect_set(a#42397, 0, 0) AS collect_set(a)#42541, 
collect_set(b#42398, 0, 0) AS collect_set(b)#42542]
     +- Project [_1#42392 AS a#42397, _2#42393 AS b#42398]
        +- LocalRelation [_1#42392, _2#42393]
     
     == Physical Plan ==
     AdaptiveSparkPlan isFinalPlan=true
     +- == Final Plan ==
        NativeProject [collect_set(a#42397, 0, 0)#42539 AS 
collect_set(a)#42541, collect_set(b#42398, 0, 0)#42540 AS 
collect_set(b)#42542]
        +- NativeHashAggregate HashAgg, List(), [collect_set(a#42397, 0, 
0), collect_set(b#42398, 0, 0)], [collect_set(a#42397, 0, 0)#42539, 
collect_set(b#42398, 0, 0)#42540], 0
           +- ShuffleQueryStage 0
              +- NativeShuffleExchange SinglePartition, 
ENSURE_REQUIREMENTS, [plan_id=7173]
                 +- NativeHashAggregate HashAgg, 
[partial_collect_set(a#42397, 0, 0), partial_collect_set(b#42398, 0, 0)], 
[buf#42546, buf#42547], 0
                    +- NativeProject [_1#42392 AS a#42397, _2#42393 AS 
b#42398]
                       +- ConvertToNative
                          +- LocalTableScan [_1#42392, _2#42393]
     +- == Initial Plan ==
        ObjectHashAggregate(keys=[], functions=[collect_set(a#42397, 0, 
0), collect_set(b#42398, 0, 0)], output=[collect_set(a)#42541, 
collect_set(b)#42542])
        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=7152]
           +- ObjectHashAggregate(keys=[], 
functions=[partial_collect_set(a#42397, 0, 0), partial_collect_set(b#42398, 0, 
0)], output=[buf#42548, buf#42549])
              +- Project [_1#42392 AS a#42397, _2#42393 AS b#42398]
                 +- LocalTableScan [_1#42392, _2#42393]
     
     == Results ==
     
     == Results ==
     !== Correct Answer - 1 ==     == Auron Answer - 1 ==
     !struct<>                     
struct<collect_set(a):array<int>,collect_set(b):array<int>>
     ![List(1, 2, 3),List(2, 4)]   [ArrayBuffer(2, 3, 1),ArrayBuffer(2, 
4)] (SparkQueryTestsBase.scala:73)
   - collect functions structs *** FAILED ***
     Results do not match for query:
     Timezone: 
sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
     Timezone Env: 
     
     == Parsed Logical Plan ==
     'Project [unresolvedalias(collect_set('a, 0, 0), 
Some(org.apache.spark.sql.Column$$Lambda$4163/0x0000000801973840@15a95b85)), 
unresolvedalias(sort_array(collect_set('b, 0, 0), true), 
Some(org.apache.spark.sql.Column$$Lambda$4163/0x0000000801973840@15a95b85))]
     +- Project [a#42667, struct(x, x#42668, y, y#42669) AS b#42673]
        +- Project [_1#42660 AS a#42667, _2#42661 AS x#42668, _3#42662 AS 
y#42669]
           +- LocalRelation [_1#42660, _2#42661, _3#42662]
     
     == Analyzed Logical Plan ==
     collect_set(a): array<int>, sort_array(collect_set(b), true): 
array<struct<x:int,y:int>>
     Aggregate [collect_set(a#42667, 0, 0) AS collect_set(a)#42816, 
sort_array(collect_set(b#42673, 0, 0), true) AS sort_array(collect_set(b), 
true)#42817]
     +- Project [a#42667, struct(x, x#42668, y, y#42669) AS b#42673]
        +- Project [_1#42660 AS a#42667, _2#42661 AS x#42668, _3#42662 AS 
y#42669]
           +- LocalRelation [_1#42660, _2#42661, _3#42662]
     
     == Optimized Logical Plan ==
     Aggregate [collect_set(a#42667, 0, 0) AS collect_set(a)#42816, 
sort_array(collect_set(b#42673, 0, 0), true) AS sort_array(collect_set(b), 
true)#42817]
     +- Project [_1#42660 AS a#42667, struct(x, _2#42661, y, _3#42662) AS 
b#42673]
        +- LocalRelation [_1#42660, _2#42661, _3#42662]
     
     == Physical Plan ==
     AdaptiveSparkPlan isFinalPlan=true
     +- == Final Plan ==
        NativeProject [collect_set(a#42667, 0, 0)#42814 AS 
collect_set(a)#42816, sort_array(collect_set(b#42673, 0, 0)#42815, true) AS 
sort_array(collect_set(b), true)#42817]
        +- NativeHashAggregate HashAgg, List(), [collect_set(a#42667, 0, 
0), collect_set(b#42673, 0, 0)], [collect_set(a#42667, 0, 0)#42814, 
collect_set(b#42673, 0, 0)#42815], 0
           +- ShuffleQueryStage 0
              +- NativeShuffleExchange SinglePartition, 
ENSURE_REQUIREMENTS, [plan_id=7425]
                 +- NativeHashAggregate HashAgg, 
[partial_collect_set(a#42667, 0, 0), partial_collect_set(b#42673, 0, 0)], 
[buf#42821, buf#42822], 0
                    +- NativeProject [_1#42660 AS a#42667, struct(x, 
_2#42661, y, _3#42662) AS b#42673]
                       +- ConvertToNative
                          +- LocalTableScan [_1#42660, _2#42661, 
_3#42662]
     +- == Initial Plan ==
        ObjectHashAggregate(keys=[], functions=[collect_set(a#42667, 0, 
0), collect_set(b#42673, 0, 0)], output=[collect_set(a)#42816, 
sort_array(collect_set(b), true)#42817])
        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=7404]
           +- ObjectHashAggregate(keys=[], 
functions=[partial_collect_set(a#42667, 0, 0), partial_collect_set(b#42673, 0, 
0)], output=[buf#42823, buf#42824])
              +- Project [_1#42660 AS a#42667, struct(x, _2#42661, y, 
_3#42662) AS b#42673]
                 +- LocalTableScan [_1#42660, _2#42661, _3#42662]
     
     == Results ==
     
     == Results ==
     !== Correct Answer - 1 ==             == Auron Answer - 1 ==
     !struct<>                             
struct<collect_set(a):array<int>,sort_array(collect_set(b), 
true):array<struct<x:int,y:int>>>
     ![List(1, 2, 3),List([2,2], [4,1])]   [ArrayBuffer(2, 3, 
1),ArrayBuffer([2,2], [4,1])] (SparkQueryTestsBase.scala:73)
   - SPARK-31500: collect_set() of BinaryType returns duplicate 
elements
   ```
   
   **Additional context**
   <!--
   Add any other context about the problem here.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to