Kontinuation commented on PR #1511:
URL: 
https://github.com/apache/datafusion-comet/pull/1511#issuecomment-2745079065

   > interleave_record_batch is much slower running Q18, this problem is still 
under investigation.
   
   This problem happens on macOS 15.3.1 with Apple M1 Pro chip. I suspect that 
this is an OS-specific problem. I'll rerun the benchmarks on EC2 instances and 
see if it still happens. I have some interesting findings when troubleshooting 
this problem. I'd like to share it here to get more understanding about it from 
other developers.
   
   The slowness of Q18 is not caused by switching to a different shuffle write 
implementation, it is because one stage of this query sometimes exhibits large 
performance outlier, and we happen to hit the outlier when benchmarking 
interleave_record_batch.
   
   The problematic stage is in the 5th Spark job of Q18. It has the following 
native query plan.
   
   ```
   SortMergeJoin: join_type=LeftSemi, on=[(col_0@0, col_0@0)]
     SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false]
       CopyExec [UnpackOrDeepCopy]
         ScanExec: source=[ShuffleQueryStage (unknown), 
Statistics(sizeInBytes=5.0 GiB, rowCount=1.50E+8)], schema=[col_0: Int64, 
col_1: Int64, col_2: Decimal128(12, 2), col_3: Date32]
     SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false]
       CopyExec [UnpackOrClone]
         ProjectionExec: expr=[col_0@0 as col_0]
           CometFilterExec: col_1@1 IS NOT NULL AND col_1@1 > Some(31300),22,2
             ProjectionExec: expr=[col_0@0 as col_0, sum@1 as col_1]
               AggregateExec: mode=Final, gby=[col_0@0 as col_0], aggr=[sum]
                 ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, 
col_1: Decimal128(22, 2), col_2: Boolean]
   ```
   
   Sometimes we observe that the aggregation operator is obviously slower than 
usual, and the stage timeline showed that tasks are taking very long time to 
finish in a specific period of time:
   
   | | Normal | Slow |
   |-|-|-|
   |SQL Metrics | <img width="1037" alt="comet-aggr-normal" 
src="https://github.com/user-attachments/assets/a890f3af-24a6-4395-a2d6-5667ad281947";
 /> | <img width="1039" alt="comet-aggr-slow" 
src="https://github.com/user-attachments/assets/fbfa1146-61c8-4870-addf-6df82bba4443";
 /> |
   |Stage Timeline | <img width="1452" alt="comet-timeline-normal" 
src="https://github.com/user-attachments/assets/36ec6345-047d-4aaf-807d-bf0401d5090b";
 /> | <img width="1447" alt="comet-timeline-slow" 
src="https://github.com/user-attachments/assets/27e55e84-8d55-4f85-8c60-e2b54bc8c316";
 /> |
   
   I fired instruments to profile the thread states and see if they are blocked 
on system calls or if there's some scheduler issues. I found that all the 
executors are blocked on a `mach_msg2_trap` system call for more than 4 seconds 
when the problem was reproduced. This system call is initiated by a Vec resize 
in 
[`SumDecimalGroupsAccumulator::merge_batch`](https://github.com/apache/datafusion-comet/blob/0.7.0/native/spark-expr/src/agg_funcs/sum_decimal.rs#L417).
 Here is the backtrace:
   
   ```
   mach_msg2_trap [0x19597e000 + 3925]  
   mach_msg2_internal [0x19597e000 + 79364]     
   vm_copy [0x19597e000 + 16392]        
   szone_realloc [0x1957c7000 + 23316]  
   _malloc_zone_realloc [0x1957c7000 + 199632]  
   _realloc [0x1957c7000 + 201852]      
   alloc::raw_vec::finish_grow::hd119b9e7c589b6bd [0x30d7ec000 + 37008760]      
   
alloc::raw_vec::RawVecInner$LT$A$GT$::reserve::do_reserve_and_handle::h6f1db5e8b36dc76d
 [0x30d7ec000 + 37009068]     
   
_$LT$datafusion_comet_spark_expr..agg_funcs..sum_decimal..SumDecimalGroupsAccumulator$u20$as$u20$datafusion_expr_common..groups_accumulator..GroupsAccumulator$GT$::merge_batch::haff18eaf0521d806
 [0x30d7ec000 + 4524316]   
   
datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch::h5261364a61419b41
 [0x30d7ec000 + 12827928]        
   
_$LT$datafusion_physical_plan..aggregates..row_hash..GroupedHashAggregateStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::h505f0a302fafaa47
 [0x30d7ec000 + 12820756]   
   
_$LT$datafusion_physical_plan..projection..ProjectionStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::he04d32ceceda7b27
 [0x30d7ec000 + 13434900]       
   
_$LT$comet..execution..operators..filter..FilterExecStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::had23e7d114a415e4
 [0x30d7ec000 + 1230048] 
   
_$LT$datafusion_physical_plan..projection..ProjectionStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::he04d32ceceda7b27
 [0x30d7ec000 + 13434900]       
   
_$LT$comet..execution..operators..copy..CopyStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::h5344363c739c6ceb
 [0x30d7ec000 + 1202008] 
   
_$LT$datafusion_physical_plan..stream..RecordBatchStreamAdapter$LT$S$GT$$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::h316229feb0b76bab
 [0x30d7ec000 + 13694588]  
   
datafusion_physical_plan::joins::sort_merge_join::SortMergeJoinStream::poll_buffered_batches::hf8abd48295b3827c
 [0x30d7ec000 + 13258056]     
   
_$LT$datafusion_physical_plan..joins..sort_merge_join..SortMergeJoinStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::h942cbfae18ce5bbf
 [0x30d7ec000 + 13251856]        
   Java_org_apache_comet_Native_executePlan [0x30d7ec000 + 1181408]     
   0x11296e8d3 [0x0 + 4606847187]       
   0x112bdd86f [0x0 + 4609398895]       
   0x20000130a7 [0x0 + 137439031463]    
   ```
   
   I have added some logs around this Vec resize to measure the size of memory 
allocated and elapsed time:
   
   ```diff
            // Make sure we have enough capacity for the additional groups
   +        let groups_before = self.sum.len();
   +        let allocated_size_before = self.sum.allocated_size();
   +        let start_ns = Instant::now();
            self.sum.resize(total_num_groups, 0);
   +        let end_ns = Instant::now();
   +        let system_time = SystemTime::now();
   +        let datetime: DateTime<Local> = DateTime::from(system_time);
   +        let formatted_date = datetime.format("%Y-%m-%d %H:%M:%S");
   +        let duration = end_ns.duration_since(start_ns);
   +        let allocated_size_after = self.sum.allocated_size();
   +        if duration.as_secs() >= 1 {
   +            println!(
   +                "[{}] resize sum, total_num_groups: from {} to {}, cost: 
{:?}, allocated_size: from {} to {}",
   +                formatted_date,
   +                groups_before,
   +                total_num_groups,
   +                duration,
   +                allocated_size_before,
   +                allocated_size_after
   +            );
   +        }
   +
   ```
   
   The results are quite interesting: all the Vec resize that take long time 
are always resizing from 4MB to 8MB.
   
   ```
   [2025-03-22 11:43:38] resize sum, total_num_groups: from 257716 to 265908, 
cost: 1.175402708s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:43:42] resize sum, total_num_groups: from 257823 to 266015, 
cost: 3.334687417s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:43:43] resize sum, total_num_groups: from 258481 to 266673, 
cost: 1.752088083s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:43:43] resize sum, total_num_groups: from 258330 to 266522, 
cost: 2.100056958s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:43:43] resize sum, total_num_groups: from 258318 to 266510, 
cost: 4.440020292s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:43:43] resize sum, total_num_groups: from 257869 to 266061, 
cost: 1.192288583s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:43:43] resize sum, total_num_groups: from 257632 to 265824, 
cost: 2.533776709s, allocated_size: from 4194304 to 8388608
   ...
   [2025-03-22 11:45:38] resize sum, total_num_groups: from 258033 to 266225, 
cost: 2.53457375s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:45:38] resize sum, total_num_groups: from 257077 to 265269, 
cost: 2.824218625s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:45:39] resize sum, total_num_groups: from 257830 to 266022, 
cost: 3.062193208s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:45:39] resize sum, total_num_groups: from 257463 to 265655, 
cost: 1.567723125s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:45:39] resize sum, total_num_groups: from 257922 to 266114, 
cost: 3.057905042s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:45:39] resize sum, total_num_groups: from 257271 to 265463, 
cost: 3.084830708s, allocated_size: from 4194304 to 8388608
   [2025-03-22 11:45:39] resize sum, total_num_groups: from 257960 to 266152, 
cost: 3.058556334s, allocated_size: from 4194304 to 8388608
   ```
   
   I have summarized the reallocation target size from the logs of running Q18 
10 times, the allocation size of Vec when running 
`SumDecimalGroupsAccumulator::merge_batch` will usually grow from 128KB to 16MB 
for each partition/task. It is strange that the slowness only happens when 
growing from 4MB to 8MB, but not for other reallocation sizes.
   
   ```
   Allocation Cost Summary (sorted by average cost)
   
----------------------------------------------------------------------------------------------------
        From →         To | Count |     Avg Cost |     Min Cost |     Max Cost 
|   Total Cost
   
----------------------------------------------------------------------------------------------------
   4,194,304 →  8,388,608 |  4000 |     64.51 ms |      1.50 µs |      13.91 s 
|     258.04 s
   8,388,608 → 16,777,216 |  4000 |      2.80 ms |      1.16 ms |     48.09 ms 
|      11.18 s
   2,097,152 →  4,194,304 |  4000 |    152.83 µs |      1.33 µs |      5.33 ms 
|    611.31 ms
   1,048,576 →  2,097,152 |  4000 |     60.74 µs |      1.29 µs |      2.05 ms 
|    242.97 ms
     524,288 →  1,048,576 |  4000 |     28.98 µs |    292.00 ns |      2.70 ms 
|    115.92 ms
     262,144 →    524,288 |  4000 |      9.66 µs |     83.00 ns |    775.25 µs 
|     38.66 ms
     131,072 →    262,144 |  4000 |      7.32 µs |    875.00 ns |      1.94 ms 
|     29.30 ms
           0 →    131,072 |  4000 |      3.11 µs |    792.00 ns |    761.17 µs 
|     12.44 ms
           0 →        192 |    10 |    262.50 ns |     41.00 ns |    667.00 ns 
|      2.62 µs
           0 →         80 |   290 |    187.34 ns |      0.00 ns |      1.25 µs 
|     54.33 µs
           0 →         96 |   240 |    175.21 ns |      0.00 ns |    875.00 ns 
|     42.05 µs
           0 →        128 |   100 |    147.50 ns |      0.00 ns |    708.00 ns 
|     14.75 µs
           0 →         64 |  1110 |    132.08 ns |      0.00 ns |      2.42 µs 
|    146.61 µs
           0 →        112 |   120 |    127.44 ns |     41.00 ns |    459.00 ns 
|     15.29 µs
           0 →        144 |    50 |    115.86 ns |     41.00 ns |    292.00 ns 
|      5.79 µs
           0 →        160 |    30 |    107.10 ns |     41.00 ns |    250.00 ns 
|      3.21 µs
           0 →        176 |    30 |    105.53 ns |     41.00 ns |    250.00 ns 
|      3.17 µs
   
----------------------------------------------------------------------------------------------------
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to