marcin-krystianc opened a new issue, #38149: URL: https://github.com/apache/arrow/issues/38149
### Describe the bug, including details regarding any error messages, version, and platform. ### Describe the bug, including details regarding any error messages, version, and platform. Hi, this is related to https://github.com/apache/arrow/issues/38087 but it covers a different problem. Similar to the previous issue, in our use case, we read some columns (e.g. 100) from a parquet file containing many more columns (e.g. 20k). The problem is that the more columns are in the file to more time is needed to read a particular column (The repro code: https://github.com/marcin-krystianc/arrow_issue_2023-10-06). In the graph below(Produced with https://github.com/marcin-krystianc/arrow_issue_2023-10-06/blob/master/plot_results.py), we can clearly see that when we read 100 columns from a parquet file (the orange line), the more columns are in the file the longer it takes to read a single column. However, when we read the entire file (all columns), then the time to read a single column doesn't depend too much on the number of columns in the file. There is still some correlation but it is much weaker than before. <img width="1916" alt="Screenshot 2023-10-06 143748" src="https://github.com/apache/arrow/assets/2308005/02518d4e-7def-4e3c-91fb-12e37c1b7c1b"> <img width="1919" alt="Screenshot 2023-10-06 151957" src="https://github.com/apache/arrow/assets/2308005/41eb07cb-65c0-4bc1-bbef-6d73cbece2f7"> Both Python and C++ exhibit the same problem, but it is not a surprise since Python delegates the Parquet file reading to C++ anyway. According to my analysis, there is a simple explanation for the reported problem. Namely, when we create a `FileReader` class, it reads and parses the entire metadata section from the file. Since the metadata section contains information about all columns, it means a lot of that metadata reading and parsing is wasted work in case we read only a tiny fraction of columns from the file. <details> <summary>Python code:</summary> ``` python import numpy as np import pyarrow as pa import pyarrow.parquet as pq import time import polars as pl import csv import gc t_write = [] t_read_100_pre_buffer = [] path = "/tmp/test_wide.parquet" columns_list = [ 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10_000, 20_000, 30_000, 40_000, 50_000, ] chunks_list = [1000, 10_000] rows_lsit = [5000] with open('results_python.csv', 'w', encoding='UTF8', newline='') as f: writer = csv.writer(f) # write the header writer.writerow(['columns','rows','chunk_size','writing(μs)','reading_all(μs)','reading_100(μs)']) for chunk_size in chunks_list: for rows in rows_lsit: for columns in columns_list: table = pl.DataFrame( data=np.random.randn(rows, columns), schema=[f"c{i}" for i in range(columns)]).to_arrow() t = time.time() pq.write_table(table, path, row_group_size=chunk_size, use_dictionary=False, write_statistics=False) t_writing = time.time() - t t_write.append(t_writing) del table gc.collect() t_read = [] t_read_100 = [] for i in range(0, 3): t = time.time() res = pq.read_table(path, use_threads=False) t_read.append(time.time() - t) del res gc.collect() t = time.time() res_100 = pq.read_table(path, columns=[f"c{i}" for i in range(100)], use_threads=False) t_read_100.append(time.time() - t) del res_100 gc.collect() t_reading = min(t_read) t_reading_100 = min(t_read_100) data = [columns, rows, chunk_size, t_writing * 1_000_000, t_reading * 1_000_000, t_reading_100 * 1_000_000] writer.writerow(data) print(str(data)) ``` </details> <details> <summary>C++ code:</summary> ``` cpp #include "arrow/api.h" #include "arrow/io/api.h" #include "arrow/result.h" #include "arrow/util/type_fwd.h" #include "parquet/arrow/reader.h" #include "parquet/arrow/writer.h" #include <iostream> #include <list> #include <chrono> #include <random> #include <vector> #include <fstream> #include <iomanip> using arrow::Status; namespace { const char *FILE_NAME = "/tmp/my_cpp.parquet"; std::shared_ptr<arrow::Table> GetTable(size_t nColumns, size_t nRows) { std::random_device dev; std::mt19937 rng(dev()); std::uniform_real_distribution<> rand_gen(0.0, 1.0); std::vector<std::shared_ptr<arrow::Array>> arrays; std::vector<std::shared_ptr<arrow::Field>> fields; // For simplicity, we'll create int32 columns. You can expand this to handle other types. for (int i = 0; i < nColumns; i++) { arrow::DoubleBuilder builder; for (auto j = 0; j < nRows; j++) { if (!builder.Append(rand_gen(rng)).ok()) throw std::runtime_error("builder.Append"); } std::shared_ptr<arrow::Array> array; if (!builder.Finish(&array).ok()) throw std::runtime_error("builder.Finish"); arrays.push_back(array); fields.push_back(arrow::field("c_" + std::to_string(i), arrow::float64(), false)); } auto table = arrow::Table::Make(arrow::schema(fields), arrays); return table; } Status WriteTableToParquet(size_t nColumns, size_t nRows, const std::string &filename, std::chrono::microseconds *dt, int64_t chunkSize) { auto table = GetTable(nColumns, nRows); auto begin = std::chrono::steady_clock::now(); auto result = arrow::io::FileOutputStream::Open(filename); auto outfile = result.ValueOrDie(); parquet::WriterProperties::Builder builder; auto properties = builder .max_row_group_length(chunkSize) ->disable_dictionary() ->build(); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, chunkSize, properties)); auto end = std::chrono::steady_clock::now(); *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin); return Status::OK(); } Status ReadEntireTable(const std::string &filename, std::chrono::microseconds *dt) { auto begin = std::chrono::steady_clock::now(); std::shared_ptr<arrow::io::ReadableFile> infile; ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename)); std::unique_ptr<parquet::arrow::FileReader> reader; ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr<arrow::Table> parquet_table; // Read the table. PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table)); auto end = std::chrono::steady_clock::now(); *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin); return Status::OK(); } Status ReadColumnsAsTable(const std::string &filename, std::vector<int> indicies, std::chrono::microseconds *dt) { auto begin = std::chrono::steady_clock::now(); std::shared_ptr<arrow::io::ReadableFile> infile; ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename)); std::unique_ptr<parquet::arrow::FileReader> reader; ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); // Read the table. std::shared_ptr<arrow::Table> parquet_table; PARQUET_THROW_NOT_OK(reader->ReadTable(indicies, &parquet_table)); auto end = std::chrono::steady_clock::now(); *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin); return Status::OK(); } Status RunMain(int argc, char **argv) { std::ofstream csvFile; csvFile.open("results_cpp.csv", std::ios_base::out); // append instead of overwrite csvFile << "columns, rows, chunk_size, writing(μs), reading_all(μs), reading_100(μs)" << std::endl; std::list<int> nColumns = { 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 40000, 50000}; std::list<int64_t> chunk_sizes = {1000, 100000}; std::list<int> rows_list = {5000}; std::vector<int> indicies(100); std::iota(indicies.begin(), indicies.end(), 0); for (auto chunk_size : chunk_sizes) { for (int nRow : rows_list) { for (int nColumn : nColumns) { std::chrono::microseconds writing_dt; ARROW_RETURN_NOT_OK(WriteTableToParquet(nColumn, nRow, FILE_NAME, &writing_dt, chunk_size)); const int repeats = 3; std::vector<std::chrono::microseconds> reading_all_dts(repeats); std::vector<std::chrono::microseconds> reading_100_dts(repeats); for (int i = 0; i < repeats; i++) { ARROW_RETURN_NOT_OK(ReadEntireTable(FILE_NAME, &reading_all_dts[i])); ARROW_RETURN_NOT_OK(ReadColumnsAsTable(FILE_NAME, indicies, &reading_100_dts[i])); } auto reading_all_dt = *std::min_element(reading_all_dts.begin(), reading_all_dts.end()); auto reading_100_dt = *std::min_element(reading_100_dts.begin(), reading_100_dts.end()); std::cerr << "(" << nColumn << ", " << nRow << ")" << ", chunk_size=" << chunk_size << ", writing_dt=" << writing_dt.count() / nColumn << ", reading_all_dt=" << reading_all_dt.count() / nColumn << ", reading_100_dt=" << reading_100_dt.count() / 100 << std::endl; csvFile << nColumn << "," << nRow << "," << chunk_size << "," << writing_dt.count() << "," << reading_all_dt.count() << "," << reading_100_dt.count() << std::endl; } } } return Status::OK(); } } int main(int argc, char **argv) { Status st = RunMain(argc, argv); if (!st.ok()) { std::cerr << st << std::endl; return 1; } return 0; } ``` </details> ### Component(s) C++, Parquet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
