westonpace commented on issue #34474:
URL: https://github.com/apache/arrow/issues/34474#issuecomment-1458582856

   Thank you very much for the report!
   
   > #"Pyarrow join non-null rows: 37317087" (also changes from run to run)
   
   Hmm, there isn't much that is non-deterministic in a hash-join.  So my guess 
would be that this is some sort of race condition.  Perhaps we are scheduling 
more tasks at the higher size and that is leading to the issue.  I was able to 
come up with a reproducer that runs in under a minute and _should_ be runnable 
with 32GB of RAM:
   
   ```
   import pyarrow as pa #11.0.0                                                 
                                                                                
                                                      
   import pandas as pd  #1.5.3                                                  
                                                                                
                                                      
   import numpy as np   #1.23.5                                                 
                                                                                
                                                      
   import pyarrow.compute as pc
   
   #Generate join key data                                                      
                                                                                
                                                      
   # n_rows = 72_000_000                                                        
                                                                                
                                                      
   n_rows = 72_000_000
   n_nan_rows = 10_000_000
   join_keys = [f'col{i}' for i in range(1,10)]
   
   some_date = pd.to_datetime('2000-01-01')
   col_date = pa.array([some_date for i in range(n_rows)])
   col_int = pa.array([i for i in range(n_rows)])
   col_str = pc.cast(col_int, pa.string())
   
   #Create dataframes -- df1 and df2 are identical except for the the 'val' 
column                                                                          
                                                          
   df1_pa = pa.Table.from_pydict({'col1': col_str,
                       'col2': col_str,
                       'col3': col_str,
                       'col4': col_str,
                       'col5': col_str,
                       'col6': col_date,
                       'col7': col_date,
                       'col8': col_date,
                       'col9': col_int})
   print(f'left nbytes: {df1_pa.nbytes}')
   
   values = pa.array([i for i in range(n_rows - n_nan_rows)] + [np.nan for i in 
range(n_nan_rows)])
   
   df2_pa = pa.Table.from_pydict({'col1': col_str,
                       'col2': col_str,
                       'col3': col_str,
                       'col4': col_str,
                       'col5': col_str,
                       'col6': col_date,
                       'col7': col_date,
                       'col8': col_date,
                       'col9': col_int,
                               'val': values})
   print(f'right nbytes: {df2_pa.nbytes}')
   
   merge_pa = df1_pa.join(df2_pa, keys = join_keys, join_type = 'left outer')
   vals_merged = merge_pa.column('val')
   
   non_null_count = pc.count(merge_pa.column('val'))
   nan_count = len(vals_merged.filter(pc.is_nan(vals_merged)))
   print(f'Expected: {n_nan_rows} Actual: {nan_count}')
   ```
   
   This will be a tricky one to get to the bottom of I think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to