[ 
https://issues.apache.org/jira/browse/ARROW-2401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428318#comment-16428318
 ] 

ASF GitHub Bot commented on ARROW-2401:
---------------------------------------

xhochy closed pull request #1840: ARROW-2401 Support filters on Hive 
partitioned Parquet files
URL: https://github.com/apache/arrow/pull/1840
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 0929a1549..beeedca03 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -711,9 +711,14 @@ class ParquetDataset(object):
         Divide files into pieces for each row group in the file
     validate_schema : boolean, default True
         Check that individual file schemas are all the same / compatible
+    filters : List[Tuple] or None (default)
+        List of filters to apply, like ``[('x', '=', 0), ...]``. This
+        implements partition-level (hive) filtering only, i.e., to prevent the
+        loading of some files of the dataset.
     """
     def __init__(self, path_or_paths, filesystem=None, schema=None,
-                 metadata=None, split_row_groups=False, validate_schema=True):
+                 metadata=None, split_row_groups=False, validate_schema=True,
+                 filters=None):
         if filesystem is None:
             a_path = path_or_paths
             if isinstance(a_path, list):
@@ -744,6 +749,9 @@ def __init__(self, path_or_paths, filesystem=None, 
schema=None,
         if validate_schema:
             self.validate_schemas()
 
+        if filters:
+            self._filter(filters)
+
     def validate_schemas(self):
         open_file = self._get_open_file_func()
 
@@ -849,6 +857,31 @@ def open_file(path, meta=None):
                                    common_metadata=self.common_metadata)
         return open_file
 
+    def _filter(self, filters):
+        def filter_accepts_partition(part_key, filter, level):
+            p_column, p_value_index = part_key
+            f_column, op, f_value = filter
+            if p_column != f_column:
+                return True
+
+            f_value_index = self.partitions.get_index(level, p_column,
+                                                      str(f_value))
+            if op == "=":
+                return f_value_index == p_value_index
+            elif op == "!=":
+                return f_value_index != p_value_index
+            else:
+                return True
+
+        def one_filter_accepts(piece, filter):
+            return all(filter_accepts_partition(part_key, filter, level)
+                       for level, part_key in enumerate(piece.partition_keys))
+
+        def all_filters_accept(piece):
+            return all(one_filter_accepts(piece, f) for f in filters)
+
+        self.pieces = [p for p in self.pieces if all_filters_accept(p)]
+
 
 def _ensure_filesystem(fs):
     fs_type = type(fs)
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index b301de606..27d6bc781 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -996,6 +996,43 @@ def test_read_partitioned_directory(tmpdir):
     _partition_test_for_filesystem(fs, base_path)
 
 
+@parquet
+def test_read_partitioned_directory_filtered(tmpdir):
+    fs = LocalFileSystem.get_instance()
+    base_path = str(tmpdir)
+
+    import pyarrow.parquet as pq
+
+    foo_keys = [0, 1]
+    bar_keys = ['a', 'b', 'c']
+    partition_spec = [
+        ['foo', foo_keys],
+        ['bar', bar_keys]
+    ]
+    N = 30
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'foo': np.array(foo_keys, dtype='i4').repeat(15),
+        'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+        'values': np.random.randn(N)
+    }, columns=['index', 'foo', 'bar', 'values'])
+
+    _generate_partition_directories(fs, base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs,
+        filters=[('foo', '=', 1), ('bar', '!=', 'b')]
+    )
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                 .sort_values(by='index')
+                 .reset_index(drop=True))
+
+    assert 0 not in result_df['foo'].values
+    assert 'b' not in result_df['bar'].values
+
+
 @pytest.yield_fixture
 def s3_example():
     access_key = os.environ['PYARROW_TEST_S3_ACCESS_KEY']


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support filters on Hive partitioned Parquet files
> -------------------------------------------------
>
>                 Key: ARROW-2401
>                 URL: https://issues.apache.org/jira/browse/ARROW-2401
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: Python
>            Reporter: Julius Neuffer
>            Priority: Minor
>              Labels: features, pull-request-available
>             Fix For: 0.10.0
>
>
> I'll open a PR on GitHub to support filtering of a `ParquetDataset` along a 
> Hive partitioned directory structure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to