marcin-krystianc opened a new issue, #38087: URL: https://github.com/apache/arrow/issues/38087
### Describe the bug, including details regarding any error messages, version, and platform. Hi, we've encountered a performance issue with reading parquet files using the Pyarrow library. In our use case, we read some columns (e.g. 100) from a parquet file containing many more columns (e.g. 20k). The difference in performance between Pyarrow and raw C++ is 4-5x which is way too much to be acceptable. The repro code: https://github.com/marcin-krystianc/arrow_issue_2023-10-06 In the graph below(Produced with https://github.com/marcin-krystianc/arrow_issue_2023-10-06/blob/master/plot_results.py), we can clearly see that when we read 100 columns from a parquet file (the orange line), the column reading time in python (on the left) is about 5x higher than in the C++ (on the right). <img width="1916" alt="Screenshot 2023-10-06 143748" src="https://github.com/apache/arrow/assets/2308005/02518d4e-7def-4e3c-91fb-12e37c1b7c1b"> Interestingly, when we compare the column reading time when we read all columns, the difference is much smaller: <img width="1919" alt="Screenshot 2023-10-06 151957" src="https://github.com/apache/arrow/assets/2308005/41eb07cb-65c0-4bc1-bbef-6d73cbece2f7"> Given that Pyarrow delegates the actual parquet reading to the native C++ library, it is quite surprising that the overhead is so high and fluctuates significantly in various scenarios. <details> <summary>Python code:</summary> ``` python import numpy as np import pyarrow as pa import pyarrow.parquet as pq import time import polars as pl import csv import gc t_write = [] t_read_100_pre_buffer = [] path = "/tmp/test_wide.parquet" columns_list = [ 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10_000, 20_000, 30_000, 40_000, 50_000, ] chunks_list = [1000, 10_000] rows_lsit = [5000] with open('results_python.csv', 'w', encoding='UTF8', newline='') as f: writer = csv.writer(f) # write the header writer.writerow(['columns','rows','chunk_size','writing(μs)','reading_all(μs)','reading_100(μs)']) for chunk_size in chunks_list: for rows in rows_lsit: for columns in columns_list: table = pl.DataFrame( data=np.random.randn(rows, columns), schema=[f"c{i}" for i in range(columns)]).to_arrow() t = time.time() pq.write_table(table, path, row_group_size=chunk_size, use_dictionary=False, write_statistics=False) t_writing = time.time() - t t_write.append(t_writing) del table gc.collect() t_read = [] t_read_100 = [] for i in range(0, 3): t = time.time() res = pq.read_table(path, use_threads=False) t_read.append(time.time() - t) del res gc.collect() t = time.time() res_100 = pq.read_table(path, columns=[f"c{i}" for i in range(100)], use_threads=False) t_read_100.append(time.time() - t) del res_100 gc.collect() t_reading = min(t_read) t_reading_100 = min(t_read_100) data = [columns, rows, chunk_size, t_writing * 1_000_000, t_reading * 1_000_000, t_reading_100 * 1_000_000] writer.writerow(data) print(str(data)) ``` </details> <details> <summary>C++ code:</summary> ``` cpp #include "arrow/api.h" #include "arrow/io/api.h" #include "arrow/result.h" #include "arrow/util/type_fwd.h" #include "parquet/arrow/reader.h" #include "parquet/arrow/writer.h" #include <iostream> #include <list> #include <chrono> #include <random> #include <vector> #include <fstream> #include <iomanip> using arrow::Status; namespace { const char *FILE_NAME = "/tmp/my_cpp.parquet"; std::shared_ptr<arrow::Table> GetTable(size_t nColumns, size_t nRows) { std::random_device dev; std::mt19937 rng(dev()); std::uniform_real_distribution<> rand_gen(0.0, 1.0); std::vector<std::shared_ptr<arrow::Array>> arrays; std::vector<std::shared_ptr<arrow::Field>> fields; // For simplicity, we'll create int32 columns. You can expand this to handle other types. for (int i = 0; i < nColumns; i++) { arrow::DoubleBuilder builder; for (auto j = 0; j < nRows; j++) { if (!builder.Append(rand_gen(rng)).ok()) throw std::runtime_error("builder.Append"); } std::shared_ptr<arrow::Array> array; if (!builder.Finish(&array).ok()) throw std::runtime_error("builder.Finish"); arrays.push_back(array); fields.push_back(arrow::field("c_" + std::to_string(i), arrow::float64(), false)); } auto table = arrow::Table::Make(arrow::schema(fields), arrays); return table; } Status WriteTableToParquet(size_t nColumns, size_t nRows, const std::string &filename, std::chrono::microseconds *dt, int64_t chunkSize) { auto table = GetTable(nColumns, nRows); auto begin = std::chrono::steady_clock::now(); auto result = arrow::io::FileOutputStream::Open(filename); auto outfile = result.ValueOrDie(); parquet::WriterProperties::Builder builder; auto properties = builder .max_row_group_length(chunkSize) ->disable_dictionary() ->build(); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, chunkSize, properties)); auto end = std::chrono::steady_clock::now(); *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin); return Status::OK(); } Status ReadEntireTable(const std::string &filename, std::chrono::microseconds *dt) { auto begin = std::chrono::steady_clock::now(); std::shared_ptr<arrow::io::ReadableFile> infile; ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename)); std::unique_ptr<parquet::arrow::FileReader> reader; ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr<arrow::Table> parquet_table; // Read the table. PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table)); auto end = std::chrono::steady_clock::now(); *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin); return Status::OK(); } Status ReadColumnsAsTable(const std::string &filename, std::vector<int> indicies, std::chrono::microseconds *dt) { auto begin = std::chrono::steady_clock::now(); std::shared_ptr<arrow::io::ReadableFile> infile; ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename)); std::unique_ptr<parquet::arrow::FileReader> reader; ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); // Read the table. std::shared_ptr<arrow::Table> parquet_table; PARQUET_THROW_NOT_OK(reader->ReadTable(indicies, &parquet_table)); auto end = std::chrono::steady_clock::now(); *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin); return Status::OK(); } Status RunMain(int argc, char **argv) { std::ofstream csvFile; csvFile.open("results_cpp.csv", std::ios_base::out); // append instead of overwrite csvFile << "columns, rows, chunk_size, writing(μs), reading_all(μs), reading_100(μs)" << std::endl; std::list<int> nColumns = { 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 40000, 50000}; std::list<int64_t> chunk_sizes = {1000, 100000}; std::list<int> rows_list = {5000}; std::vector<int> indicies(100); std::iota(indicies.begin(), indicies.end(), 0); for (auto chunk_size : chunk_sizes) { for (int nRow : rows_list) { for (int nColumn : nColumns) { std::chrono::microseconds writing_dt; ARROW_RETURN_NOT_OK(WriteTableToParquet(nColumn, nRow, FILE_NAME, &writing_dt, chunk_size)); const int repeats = 3; std::vector<std::chrono::microseconds> reading_all_dts(repeats); std::vector<std::chrono::microseconds> reading_100_dts(repeats); for (int i = 0; i < repeats; i++) { ARROW_RETURN_NOT_OK(ReadEntireTable(FILE_NAME, &reading_all_dts[i])); ARROW_RETURN_NOT_OK(ReadColumnsAsTable(FILE_NAME, indicies, &reading_100_dts[i])); } auto reading_all_dt = *std::min_element(reading_all_dts.begin(), reading_all_dts.end()); auto reading_100_dt = *std::min_element(reading_100_dts.begin(), reading_100_dts.end()); std::cerr << "(" << nColumn << ", " << nRow << ")" << ", chunk_size=" << chunk_size << ", writing_dt=" << writing_dt.count() / nColumn << ", reading_all_dt=" << reading_all_dt.count() / nColumn << ", reading_100_dt=" << reading_100_dt.count() / 100 << std::endl; csvFile << nColumn << "," << nRow << "," << chunk_size << "," << writing_dt.count() << "," << reading_all_dt.count() << "," << reading_100_dt.count() << std::endl; } } } return Status::OK(); } } int main(int argc, char **argv) { Status st = RunMain(argc, argv); if (!st.ok()) { std::cerr << st << std::endl; return 1; } return 0; } ``` </details> ### Component(s) Parquet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
