psuszyns opened a new issue, #20788:
URL: https://github.com/apache/datafusion/issues/20788

   ### Describe the bug
   
   unnest of several columns followed by a group by results in an extremely 
high memory usage. A 341 MB parquet file containing 3 array columns, with 20000 
records and 2000 elements in each array column, processed by the query outlined 
in the 'to reproduce' section results in more 53GB+ of RAM usage.
   
   ### To Reproduce
   
   ```python
   """
   Minimal DataFusion example demonstrating memory explosion with:
     row_index + unnest + group_by
   
   This example creates a Parquet file with list columns and shows how the 
   unnest + group_by pattern causes unbounded memory growth even when the 
   input is processed in streaming mode.
   
   Usage:
       python datafusion_unnest_memory_issue.py generate  # Generate test data
       python datafusion_unnest_memory_issue.py monitor   # Run with memory 
monitoring
   """
   
   import sys
   import os
   import psutil
   import threading
   import pyarrow as pa
   import pyarrow.parquet as pq
   import numpy as np
   import datafusion
   from datafusion import col, literal, functions as f
   import time
   import tracemalloc
   
   # Configuration
   NUM_ROWS = 20_000       # Number of records - for 20k records more than 64 
GB of memory is recommended
   LIST_SIZE = 2000         # Number of elements per list
   OUTPUT_DIR = "datafusion_test_data"
   PARQUET_FILE = 
f"{OUTPUT_DIR}/test_lists_{NUM_ROWS}rows_{LIST_SIZE}elements.parquet"
   
   
   def generate_test_data():
       """
       Generate a Parquet file with multiple list columns.
       
       Schema:
           metadata: string   - some scalar data
           values_a: list<int32>    - list column A
           values_b: list<float32>  - list column B
           values_c: list<float32>  - list column C
       
       Each row has LIST_SIZE elements in each list column.
       """
       print(f"Generating test data: {NUM_ROWS} rows x {LIST_SIZE} list 
elements")
       print(f"Expected intermediate rows after unnest: {NUM_ROWS * LIST_SIZE}")
       
       os.makedirs(OUTPUT_DIR, exist_ok=True)
       
       # Generate data in chunks
       chunk_size = 10000
       writer = None
       
       for chunk_start in range(0, NUM_ROWS, chunk_size):
           chunk_end = min(chunk_start + chunk_size, NUM_ROWS)
           chunk_rows = chunk_end - chunk_start
           
           # Generate scalar columns
           metadata = [f"row_{i}" for i in range(chunk_start, chunk_end)]
           
           # Generate list columns - each row has LIST_SIZE elements
           # Using different value patterns to make data realistic
           np.random.seed(42 + chunk_start)
           
           values_a = [
               np.random.randint(0, 100, size=LIST_SIZE).tolist()
               for _ in range(chunk_rows)
           ]
           values_b = [
               np.random.uniform(0, 100, 
size=LIST_SIZE).astype(np.float32).tolist()
               for _ in range(chunk_rows)
           ]
           values_c = [
               np.random.uniform(0, 200, 
size=LIST_SIZE).astype(np.float32).tolist()
               for _ in range(chunk_rows)
           ]
           
           # Create Arrow arrays
           table = pa.table({
               "metadata": pa.array(metadata, type=pa.string()),
               "values_a": pa.array(values_a, type=pa.list_(pa.int32())),
               "values_b": pa.array(values_b, type=pa.list_(pa.float32())),
               "values_c": pa.array(values_c, type=pa.list_(pa.float32())),
           })
           
           if writer is None:
               writer = pq.ParquetWriter(PARQUET_FILE, table.schema)
           
           writer.write_table(table)
           print(f"  Written rows {chunk_start} - {chunk_end}")
       
       writer.close()
       
       file_size_mb = os.path.getsize(PARQUET_FILE) / (1024 * 1024)
       print(f"\nGenerated: {PARQUET_FILE}")
       print(f"File size: {file_size_mb:.1f} MB")
       print(f"Total rows: {NUM_ROWS}")
       print(f"Elements per list: {LIST_SIZE}")
       print(f"Expected unnested rows: {NUM_ROWS * LIST_SIZE}")
   
   
   def run_problematic_query():
       """
       Demonstrate the memory issue with unnest + group_by pattern. This 
pattern requires:
       1. Adding a row index (to group by later)
       2. Unnesting multiple list columns
       3. Grouping by the row index
       
       The problem: Step 3 must buffer ALL unnested rows before it can emit 
results,
       causing memory to grow with input size x list size.
       """
       ctx = datafusion.SessionContext()
       
       print(f"\nRegistering Parquet file: {PARQUET_FILE}")
       ctx.register_parquet("test_data", PARQUET_FILE)
       
       # Show input stats
       result = ctx.sql("SELECT COUNT(*) as cnt FROM test_data").collect()
       input_rows = result[0]["cnt"][0]
       print(f"Input rows: {input_rows}")
       
       # Run the problematic query
       print("\n--- Unnest + Group By (MEMORY EXPLOSION EXPECTED) ---")
       print("This query unnests list columns and groups by row index.")
       print(f"Expected intermediate rows: {int(input_rows) * LIST_SIZE}")
       print("\nStarting query... (watch memory usage)")
       
       start = time.time()
       tracemalloc.start()
       
       # The problematic pattern using SQL:
       problematic_query = """
       WITH indexed AS (
           SELECT 
               ROW_NUMBER() OVER () as row_idx,
               metadata,
               values_a,
               values_b,
               values_c
           FROM test_data
       ),
       unnested AS (
           SELECT 
               row_idx,
               metadata,
               unnest(values_a) as val_a,
               unnest(values_b) as val_b,
               unnest(values_c) as val_c
           FROM indexed
       ),
       transformed AS (
           SELECT
               row_idx,
               metadata,
               val_a,
               val_b,
               val_c,
               -- Example transformation: create a new column based on val_a, 
val_b and val_c
               CASE WHEN val_c > 100 THEN val_a * val_b ELSE val_a + val_b END 
AS val_d
           FROM unnested
       )
       SELECT
           row_idx,
           metadata,
           array_agg(val_a ORDER BY row_idx) AS values_a,
           array_agg(val_b ORDER BY row_idx) AS values_b,
           array_agg(val_c ORDER BY row_idx) AS values_c,
           array_agg(val_d ORDER BY row_idx) AS values_d
       FROM transformed
       GROUP BY row_idx, metadata
       ORDER BY row_idx
       """
       
       try:
           print("\nExecuting query...")
           result = ctx.sql(problematic_query)
   
           # we want to stream the results to the output file
           output_file = f"{OUTPUT_DIR}/query_result.parquet"
           result.write_parquet(output_file)
           
           current, peak = tracemalloc.get_traced_memory()
           tracemalloc.stop()
           
           print(f"\nQuery completed!")
           print(f"Time: {time.time() - start:.2f}s")
           print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
                   
       except Exception as e:
           current, peak = tracemalloc.get_traced_memory()
           tracemalloc.stop()
           
           print(f"\nQuery failed after {time.time() - start:.2f}s")
           print(f"Peak memory before failure: {peak / 1024 / 1024:.1f} MB")
           print(f"Error: {e}")
   
   
   def run_with_memory_monitoring():
       """
       Run the query with detailed memory monitoring using psutil.
       """
       
       process = psutil.Process(os.getpid())
       max_memory = [0]
       stop_monitoring = [False]
       
       def monitor_memory():
           while not stop_monitoring[0]:
               mem = process.memory_info().rss / 1024 / 1024
               max_memory[0] = max(max_memory[0], mem)
               time.sleep(0.1)
       
       monitor_thread = threading.Thread(target=monitor_memory)
       monitor_thread.start()
       
       try:
           initial_mem = process.memory_info().rss / 1024 / 1024
           print(f"Initial memory: {initial_mem:.1f} MB")
           
           run_problematic_query()
           
       finally:
           stop_monitoring[0] = True
           monitor_thread.join()
           
           final_mem = process.memory_info().rss / 1024 / 1024
           print(f"\n=== Memory Summary ===")
           print(f"Initial: {initial_mem:.1f} MB")
           print(f"Peak: {max_memory[0]:.1f} MB")
           print(f"Final: {final_mem:.1f} MB")
           print(f"Growth: {max_memory[0] - initial_mem:.1f} MB")
   
   
   def show_usage():
       print(__doc__)
       print("\nCommands:")
       print("  generate  - Generate test Parquet file")
       print("  run       - Run the problematic query")
       print("  monitor   - Run with memory monitoring (requires psutil)")
   
   
   if __name__ == "__main__":
       if len(sys.argv) < 2:
           show_usage()
           sys.exit(1)
       
       command = sys.argv[1]
       
       if command == "generate":
           generate_test_data()
       elif command == "run":
           if not os.path.exists(PARQUET_FILE):
               print(f"Test data not found. Run 'python {sys.argv[0]} generate' 
first.")
               sys.exit(1)
           run_problematic_query()
       elif command == "monitor":
           if not os.path.exists(PARQUET_FILE):
               print(f"Test data not found. Run 'python {sys.argv[0]} generate' 
first.")
               sys.exit(1)
           run_with_memory_monitoring()
       else:
           show_usage()
           sys.exit(1)
   ```
   
   ### Expected behavior
   
   Executing the above query should be possible with constant memory usage in a 
streaming fashion.
   
   ### Additional context
   
   Given example is a toy example but it very closely resembles a real use case 
when working with bioinformatics vcf format.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to