marcin-krystianc opened a new issue, #38087:
URL: https://github.com/apache/arrow/issues/38087

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Hi,
   
   we've encountered a performance issue with reading parquet files using the 
Pyarrow library.
   In our use case, we read some columns (e.g. 100) from a parquet file 
containing many more columns (e.g. 20k).
   
   The difference in performance between Pyarrow and raw C++ is 4-5x which is 
way too much to be acceptable. 
   
   The repro code: https://github.com/marcin-krystianc/arrow_issue_2023-10-06
   In the graph below(Produced with 
https://github.com/marcin-krystianc/arrow_issue_2023-10-06/blob/master/plot_results.py),
 we can clearly see that when we read 100 columns from a parquet file (the 
orange line), the column reading time in python (on the left) is about 5x 
higher than in the C++ (on the right).
   <img width="1916" alt="Screenshot 2023-10-06 143748" 
src="https://github.com/apache/arrow/assets/2308005/02518d4e-7def-4e3c-91fb-12e37c1b7c1b";>
   
   Interestingly, when we compare the column reading time when we read all 
columns, the difference is much smaller:
   <img width="1919" alt="Screenshot 2023-10-06 151957" 
src="https://github.com/apache/arrow/assets/2308005/41eb07cb-65c0-4bc1-bbef-6d73cbece2f7";>
   
   Given that Pyarrow delegates the actual parquet reading to the native C++ 
library, it is quite surprising that the overhead is so high and fluctuates 
significantly in various scenarios.
   
   
   <details>
   <summary>Python code:</summary>
   
   ``` python
   import numpy as np
   import pyarrow as pa
   import pyarrow.parquet as pq
   
   import time
   import polars as pl
   import csv
   import gc
   
   t_write = []
   t_read_100_pre_buffer = []
   
   path = "/tmp/test_wide.parquet"
   
   columns_list = [
                   100, 200, 300, 400, 500, 600, 700, 800, 900,              
                   1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
                   10_000, 20_000, 30_000, 40_000, 50_000, 
                   ]
   
   chunks_list = [1000, 10_000]
   rows_lsit = [5000]
   with open('results_python.csv', 'w', encoding='UTF8', newline='') as f:
   
       writer = csv.writer(f)
       # write the header
       
writer.writerow(['columns','rows','chunk_size','writing(μs)','reading_all(μs)','reading_100(μs)'])
   
       for chunk_size in chunks_list:
           for rows in rows_lsit:
               for columns in columns_list:
         
                   table = pl.DataFrame(
                       data=np.random.randn(rows, columns),
                       schema=[f"c{i}" for i in range(columns)]).to_arrow()
   
                   t = time.time()
                   pq.write_table(table, path, row_group_size=chunk_size, 
use_dictionary=False, write_statistics=False)
                   t_writing = time.time() - t
                   t_write.append(t_writing)
   
                   del table
                   gc.collect()
   
                   t_read = []
                   t_read_100 = []
   
                   for i in range(0, 3):
   
                       t = time.time()
                       res = pq.read_table(path, use_threads=False)
                       t_read.append(time.time() - t)
           
                       del res 
                       gc.collect()
   
                       t = time.time()               
                       res_100 = pq.read_table(path, columns=[f"c{i}" for i in 
range(100)], use_threads=False)
                       t_read_100.append(time.time() - t)    
                   
                       del res_100
                       gc.collect()
   
                   t_reading = min(t_read)
                   t_reading_100 = min(t_read_100)
   
                   data = [columns, rows, chunk_size, t_writing * 1_000_000, 
t_reading * 1_000_000, t_reading_100 * 1_000_000]
                   writer.writerow(data)
                   print(str(data))
   ```
   
   </details>
   
   <details>
   <summary>C++ code:</summary>
   
   ``` cpp
   #include "arrow/api.h"
   #include "arrow/io/api.h"
   #include "arrow/result.h"
   #include "arrow/util/type_fwd.h"
   #include "parquet/arrow/reader.h"
   #include "parquet/arrow/writer.h"
   
   #include <iostream>
   #include <list>
   #include <chrono>
   #include <random>
   #include <vector>
   #include <fstream>
   #include <iomanip>
   
   using arrow::Status;
   
   namespace
   {
     const char *FILE_NAME = "/tmp/my_cpp.parquet";
   
     std::shared_ptr<arrow::Table> GetTable(size_t nColumns, size_t nRows)
     {
       std::random_device dev;
       std::mt19937 rng(dev());
       std::uniform_real_distribution<> rand_gen(0.0, 1.0);
   
       std::vector<std::shared_ptr<arrow::Array>> arrays;
       std::vector<std::shared_ptr<arrow::Field>> fields;
   
       // For simplicity, we'll create int32 columns. You can expand this to 
handle other types.
       for (int i = 0; i < nColumns; i++)
       {
         arrow::DoubleBuilder builder;
         for (auto j = 0; j < nRows; j++)
         {
           if (!builder.Append(rand_gen(rng)).ok())
             throw std::runtime_error("builder.Append");
         }
   
         std::shared_ptr<arrow::Array> array;
         if (!builder.Finish(&array).ok())
           throw std::runtime_error("builder.Finish");
   
         arrays.push_back(array);
         fields.push_back(arrow::field("c_" + std::to_string(i), 
arrow::float64(), false));
       }
   
       auto table = arrow::Table::Make(arrow::schema(fields), arrays);
       return table;
     }
   
     Status WriteTableToParquet(size_t nColumns, size_t nRows, const 
std::string &filename, std::chrono::microseconds *dt, int64_t chunkSize)
     {
       auto table = GetTable(nColumns, nRows);
       auto begin = std::chrono::steady_clock::now();
       auto result = arrow::io::FileOutputStream::Open(filename);
       auto outfile = result.ValueOrDie();
       parquet::WriterProperties::Builder builder;
       auto properties = builder
                             .max_row_group_length(chunkSize)
                             ->disable_dictionary()
                             ->build();
       PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), outfile, chunkSize, properties));
       auto end = std::chrono::steady_clock::now();
       *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
       return Status::OK();
     }
   
     Status ReadEntireTable(const std::string &filename, 
std::chrono::microseconds *dt)
     {
       auto begin = std::chrono::steady_clock::now();
   
       std::shared_ptr<arrow::io::ReadableFile> infile;
       ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
       std::unique_ptr<parquet::arrow::FileReader> reader;
       ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, 
arrow::default_memory_pool(), &reader));
       std::shared_ptr<arrow::Table> parquet_table;
   
       // Read the table.
       PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table));
       auto end = std::chrono::steady_clock::now();
       *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
       return Status::OK();
     }
   
     Status ReadColumnsAsTable(const std::string &filename, std::vector<int> 
indicies, std::chrono::microseconds *dt)
     {
       auto begin = std::chrono::steady_clock::now();
   
       std::shared_ptr<arrow::io::ReadableFile> infile;
       ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
       std::unique_ptr<parquet::arrow::FileReader> reader;
       ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, 
arrow::default_memory_pool(), &reader));
   
       // Read the table.
       std::shared_ptr<arrow::Table> parquet_table;
       PARQUET_THROW_NOT_OK(reader->ReadTable(indicies, &parquet_table));
       auto end = std::chrono::steady_clock::now();
       *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
       return Status::OK();
     }
   
     Status RunMain(int argc, char **argv)
     {
       std::ofstream csvFile;
       csvFile.open("results_cpp.csv", std::ios_base::out); // append instead 
of overwrite
       csvFile << "columns, rows, chunk_size, writing(μs), reading_all(μs), 
reading_100(μs)" << std::endl;
   
       std::list<int> nColumns = {
           100, 200, 300, 400, 500, 600, 700, 800, 900,
           1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
           10000, 20000, 30000, 40000, 50000};
   
       std::list<int64_t> chunk_sizes = {1000, 100000};
       std::list<int> rows_list = {5000};
   
       std::vector<int> indicies(100);
       std::iota(indicies.begin(), indicies.end(), 0);
   
       for (auto chunk_size : chunk_sizes)
       {
         for (int nRow : rows_list)
         {
           for (int nColumn : nColumns)
           {
             std::chrono::microseconds writing_dt;
             ARROW_RETURN_NOT_OK(WriteTableToParquet(nColumn, nRow, FILE_NAME, 
&writing_dt, chunk_size));
   
             const int repeats = 3;
             std::vector<std::chrono::microseconds> reading_all_dts(repeats);
             std::vector<std::chrono::microseconds> reading_100_dts(repeats);
             for (int i = 0; i < repeats; i++)
             {
               ARROW_RETURN_NOT_OK(ReadEntireTable(FILE_NAME, 
&reading_all_dts[i]));
               ARROW_RETURN_NOT_OK(ReadColumnsAsTable(FILE_NAME, indicies, 
&reading_100_dts[i]));
             }
   
             auto reading_all_dt = *std::min_element(reading_all_dts.begin(), 
reading_all_dts.end());
             auto reading_100_dt = *std::min_element(reading_100_dts.begin(), 
reading_100_dts.end());
   
             std::cerr << "(" << nColumn << ", " << nRow << ")"
                       << ", chunk_size=" << chunk_size
                       << ", writing_dt=" << writing_dt.count() / nColumn
                       << ", reading_all_dt=" << reading_all_dt.count() / 
nColumn
                       << ", reading_100_dt=" << reading_100_dt.count() / 100
                       << std::endl;
   
             csvFile << nColumn << ","
                     << nRow << ","
                     << chunk_size << ","
                     << writing_dt.count() << ","
                     << reading_all_dt.count() << ","
                     << reading_100_dt.count()
                     << std::endl;
           }
         }
       }
   
       return Status::OK();
     }
   }
   
   int main(int argc, char **argv)
   {
     Status st = RunMain(argc, argv);
     if (!st.ok())
     {
       std::cerr << st << std::endl;
       return 1;
     }
     return 0;
   }
   ```
   </details>
   
    
   
   ### Component(s)
   
   Parquet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to