ARF created ARROW-11826: --------------------------- Summary: Improve performance of repeated filtered parquet reads Key: ARROW-11826 URL: https://issues.apache.org/jira/browse/ARROW-11826 Project: Apache Arrow Issue Type: Improvement Components: Python Affects Versions: 3.0.0 Reporter: ARF
{{pq.read_table(..., filters=...)}} is slower than it probably should be... In particular, there should be a faster solution for repeatedly reading from the same file. For a 1,8 GB parquet file with about 5000 row groups (written with row-group metadata statistics), reading with a filter that selects a sub-section of a single row group takes 1.65 s. Repeated, similarly small reads from the same file each take the same amount of time with {{pq.read_table(..., filters=...)}}. I say "slower than it probably should be" because with the rudimentary example code below, the initial filtered read takes about 975 ms. Any subsequent read takes only 14ms. I have no idea what makes the C++ code slow, but in Python I observed two dominant time-wasters on for repeated queries: # {{pq.ParquetFile('test.parquet')}} takes 170ms (this might be a Windows issue), so opening the file only once helps # iterating though metadata surprisingly slow: 80 ms for a single pass, e.g. {{[pq_file.metadata.row_group(i).column(0).statistics.min for i in range(pq_file.metadata.num_row_groups)]}}. E.g. six passes add up to about 500ms. (One pass each for minimum and maximum on three columns.) Caching the statistics really helps here. {code:python} from functools import cached_property, lru_cache import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq def equal(x, y, **kwargs): """Enable equal comparison for dictionary array vs. scalar.""" type_x = type(x.type) type_y = type(y.type) # fastpath if type_x is pa.DataType and type_y is pa.DataType: return pc._equal(x, y, **kwargs) if type_x is pa.DictionaryType and type_y is pa.DataType: array_arg = x scalar_arg = y elif type_x is pa.DataType and type_x is pa.DictionaryType: array_arg = y scalar_arg = x else: # fallback to default implemetation (which will error out...) return pc._equal(x, y, **kwargs) # have dictionary vs. scalar comparison def chunk_generator(): for chunk in array_arg.iterchunks(): # for large dictionaries use pyarrow to search index if len(chunk.dictionary) > 30: index = pc.index_in(scalar_arg, options=pc.SetLookupOptions( value_set=chunk.dictionary)) if index.is_valid: yield pc.equal(chunk.indices, index) continue # for small dictionaries index search in python is faster try: index = chunk.dictionary.to_pylist().index(scalar_arg.as_py()) yield pc.equal(chunk.indices, pa.scalar(index, chunk.indices.type)) continue except ValueError: pass # value not found in dictionary yield pa.nulls(len(chunk), pa.bool_()).fill_null(False) return pa.chunked_array(chunk_generator()) pc._equal = pc.equal pc.equal = equal @lru_cache(maxsize=None) def column_names_from_parquet_file(parquet_file): """"Returns a dict associating column names with column number for a given parquet file. The result is cached.""" return { parquet_file.metadata.row_group(0).column(i).path_in_schema: i for i in range(parquet_file.metadata.row_group(0).num_columns) } @lru_cache(maxsize=None) def metadata_from_parquet_file(parquet_file, field_name): """Returns a tuple(min_, max_) where min_ and max_ are lists with the row group metadata statistics min and max respectively. The result is cached.""" column_id = column_names_from_parquet_file(parquet_file)[field_name] pq_metadata = parquet_file.metadata min_ = [ pq_metadata.row_group(i).column(column_id).statistics.min for i in range(pq_metadata.num_row_groups) ] max_ = [ pq_metadata.row_group(i).column(column_id).statistics.max for i in range(pq_metadata.num_row_groups) ] return min_, max_ class Node: """Base class for a node in a computation graph.""" def __init__(self, left=None, right=None): self.left = left self.right = right def __eq__(self, other): return Node_Equal(self, other) def __and__(self, other): return Node_And(self, other) def __or__(self, other): return Node_Or(self, other) def filter_table(self, table): """Applies the computation graph as a filter on a table.""" mask = self.evaluate_on_table(table) return table.filter(mask) class Node_Equal(Node): def _identify_field_and_literal(self): if isinstance(self.left, Field): self.__dict__['field'] = field = self.left self.__dict__['literal'] = literal = self.right else: self.field = self.right self.literal = self.left assert isinstance(literal, (str, float, int)) return field, literal @cached_property def field(self): field = self.__dict__.get( 'field', None) or self._identify_field_and_literal()[0] return field @cached_property def literal(self): literal = self.__dict__.get( 'literal', None) or self._identify_field_and_literal()[1] return literal def evaluate_on_metadata(self, parquet_file, row_groups=None): field = self.field literal = self.literal min_, max_ = metadata_from_parquet_file(parquet_file, field.name) row_groups = row_groups or range(len(min_)) return [ i for i in row_groups if min_[i] <= literal <= max_[i] ] def evaluate_on_table(self, table): field = self.field literal = self.literal column = table[field.name] if type(column.type) is pa.DictionaryType: pa_scalar = pa.scalar(literal, column.type.value_type) else: pa_scalar = pa.scalar(literal, column.type) return pc.equal(column, pa_scalar) class Node_And(Node): def evaluate_on_metadata(self, parquet_file, row_groups=None): filtered_row_groups = self.left.evaluate_on_metadata( parquet_file, row_groups) return self.right.evaluate_on_metadata(parquet_file, filtered_row_groups) def evaluate_on_table(self, table): mask1 = self.left.evaluate_on_table(table) mask2 = self.right.evaluate_on_table(table) return pc.and_(mask1, mask2) class Node_Or(Node): def evaluate_on_metadata(self, parquet_file, row_groups=None): row_groups1 = self.left.evaluate_on_metadata(parquet_file, row_groups) row_groups2 = self.right.evaluate_on_metadata(parquet_file, row_groups) return sorted(set(row_groups1) | set(row_groups2)) def evaluate_on_table(self, table): mask1 = self.left.evaluate_on_table(table) mask2 = self.right.evaluate_on_table(table) return pc.or_(mask1, mask2) class Field: def __init__(self, name): self.name = name def __eq__(self, other): return Node_Equal(self, other) def read_table_filtered(parquet_file, filter=None): """Fast filtered read from a parquet file.""" if isinstance(parquet_file, str): parquet_file = pq.ParquetFile(parquet_file) row_groups = filter.evaluate_on_metadata(parquet_file) table = parquet_file.read_row_groups(row_groups) return filter.filter_table(table) if __name__ == '__main__': pq_file = pq.ParquetFile('test.parquet') filter = (Field('code') == 12345) & ( Field('foo') == 'foo value') & (Field('bar') == 'bar value') table = read_table_filtered(pq_file, filter) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)