[ https://issues.apache.org/jira/browse/ARROW-2401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428318#comment-16428318 ]
ASF GitHub Bot commented on ARROW-2401: --------------------------------------- xhochy closed pull request #1840: ARROW-2401 Support filters on Hive partitioned Parquet files URL: https://github.com/apache/arrow/pull/1840 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 0929a1549..beeedca03 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -711,9 +711,14 @@ class ParquetDataset(object): Divide files into pieces for each row group in the file validate_schema : boolean, default True Check that individual file schemas are all the same / compatible + filters : List[Tuple] or None (default) + List of filters to apply, like ``[('x', '=', 0), ...]``. This + implements partition-level (hive) filtering only, i.e., to prevent the + loading of some files of the dataset. """ def __init__(self, path_or_paths, filesystem=None, schema=None, - metadata=None, split_row_groups=False, validate_schema=True): + metadata=None, split_row_groups=False, validate_schema=True, + filters=None): if filesystem is None: a_path = path_or_paths if isinstance(a_path, list): @@ -744,6 +749,9 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, if validate_schema: self.validate_schemas() + if filters: + self._filter(filters) + def validate_schemas(self): open_file = self._get_open_file_func() @@ -849,6 +857,31 @@ def open_file(path, meta=None): common_metadata=self.common_metadata) return open_file + def _filter(self, filters): + def filter_accepts_partition(part_key, filter, level): + p_column, p_value_index = part_key + f_column, op, f_value = filter + if p_column != f_column: + return True + + f_value_index = self.partitions.get_index(level, p_column, + str(f_value)) + if op == "=": + return f_value_index == p_value_index + elif op == "!=": + return f_value_index != p_value_index + else: + return True + + def one_filter_accepts(piece, filter): + return all(filter_accepts_partition(part_key, filter, level) + for level, part_key in enumerate(piece.partition_keys)) + + def all_filters_accept(piece): + return all(one_filter_accepts(piece, f) for f in filters) + + self.pieces = [p for p in self.pieces if all_filters_accept(p)] + def _ensure_filesystem(fs): fs_type = type(fs) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index b301de606..27d6bc781 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -996,6 +996,43 @@ def test_read_partitioned_directory(tmpdir): _partition_test_for_filesystem(fs, base_path) +@parquet +def test_read_partitioned_directory_filtered(tmpdir): + fs = LocalFileSystem.get_instance() + base_path = str(tmpdir) + + import pyarrow.parquet as pq + + foo_keys = [0, 1] + bar_keys = ['a', 'b', 'c'] + partition_spec = [ + ['foo', foo_keys], + ['bar', bar_keys] + ] + N = 30 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'foo': np.array(foo_keys, dtype='i4').repeat(15), + 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2), + 'values': np.random.randn(N) + }, columns=['index', 'foo', 'bar', 'values']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + dataset = pq.ParquetDataset( + base_path, filesystem=fs, + filters=[('foo', '=', 1), ('bar', '!=', 'b')] + ) + table = dataset.read() + result_df = (table.to_pandas() + .sort_values(by='index') + .reset_index(drop=True)) + + assert 0 not in result_df['foo'].values + assert 'b' not in result_df['bar'].values + + @pytest.yield_fixture def s3_example(): access_key = os.environ['PYARROW_TEST_S3_ACCESS_KEY'] ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support filters on Hive partitioned Parquet files > ------------------------------------------------- > > Key: ARROW-2401 > URL: https://issues.apache.org/jira/browse/ARROW-2401 > Project: Apache Arrow > Issue Type: New Feature > Components: Python > Reporter: Julius Neuffer > Priority: Minor > Labels: features, pull-request-available > Fix For: 0.10.0 > > > I'll open a PR on GitHub to support filtering of a `ParquetDataset` along a > Hive partitioned directory structure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)