This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a3f2678  ARROW-8039: [Python] Use dataset API in existing parquet 
readers and tests
a3f2678 is described below

commit a3f267833c7b3eacc1a1c44ea9c465e05107b31e
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Thu Apr 9 09:57:54 2020 -0400

    ARROW-8039: [Python] Use dataset API in existing parquet readers and tests
    
    This is testing to optionally use the dataset API in the pyarrow parquet 
reader implementation (`read_table` and `ParquetDataset().read()`).
    Currently, it is enabled by passing `use_legacy_dataset=False` (mechanism 
to opt in to be discussed), which allows to run our existing parquet tests with 
this (the approach I now took is to parametrize the existing tests for 
use_legacy_dataset True/False).
    
    This allows users to do:
    
    ```
    table = pq.read_table("my_parquet_file_or_dir", columns=.., filters=.., 
use_legacy_dataset=False)
    ```
    
    or
    
    ```
    dataset = pq.ParquetDataset("my_parquet_dir/", use_legacy_dataset=False)
    table = table.read(...)
    ```
    
    and with the idea that at some point, the default for `use_legacy_dataset` 
would switch from `True` to `False`.
    
    Long term, I think we certainly want to keep `pq.read_table` (and I think 
we will also be able to support most of its keywords).
    
    The future for `pq.ParquetDataset` is less clear (it has a lot of API that 
is tied to the python implementation, eg the ParquetDatasetPiece, PartitionSet, 
ParquetPartitions, .. classes). We probably want to move people towards a "new" 
ParquetDataset that is more consistent with the new general datasets API. 
Therefore, right now the `ParquetDataset(use_legacy_dataset=False)` does not 
yet try to provide all those features, but just the `read()` method. We can 
later see which extra featu [...]
    
    Closes #6303 from jorisvandenbossche/parquet-dataset-experiment
    
    Authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 python/pyarrow/_dataset.pyx          |  10 +-
 python/pyarrow/parquet.py            | 319 +++++++++++++--
 python/pyarrow/tests/test_parquet.py | 744 ++++++++++++++++++++++++-----------
 3 files changed, 810 insertions(+), 263 deletions(-)

diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 556faa7..231588a 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -623,7 +623,8 @@ cdef class ParquetReadOptions:
     buffer_size : int, default 8192
         Size of buffered stream, if enabled. Default is 8KB.
     dictionary_columns : list of string, default None
-        Names of columns which should be read as dictionaries.
+        Names of columns which should be dictionary encoded as
+        they are read.
     """
 
     cdef public:
@@ -632,9 +633,11 @@ cdef class ParquetReadOptions:
         set dictionary_columns
 
     def __init__(self, bint use_buffered_stream=False,
-                 uint32_t buffer_size=8192,
+                 buffer_size=8192,
                  dictionary_columns=None):
         self.use_buffered_stream = use_buffered_stream
+        if buffer_size <= 0:
+            raise ValueError("Buffer size must be larger than zero")
         self.buffer_size = buffer_size
         self.dictionary_columns = set(dictionary_columns or set())
 
@@ -1191,7 +1194,8 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
                     c_options
                 )
         else:
-            raise TypeError('Must pass either paths or a FileSelector')
+            raise TypeError('Must pass either paths or a FileSelector, but '
+                            'passed {}'.format(type(paths_or_selector)))
 
         self.init(GetResultValue(result))
 
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 6563216..8586366 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -77,7 +77,7 @@ def _check_contains_null(val):
     return False
 
 
-def _check_filters(filters):
+def _check_filters(filters, check_null_strings=True):
     """
     Check if filters are well-formed.
     """
@@ -89,19 +89,94 @@ def _check_filters(filters):
             # too few:
             #   We have [(,,), ..] instead of [[(,,), ..]]
             filters = [filters]
-        for conjunction in filters:
-            for col, op, val in conjunction:
-                if (
-                    isinstance(val, list)
-                    and all(_check_contains_null(v) for v in val)
-                    or _check_contains_null(val)
-                ):
-                    raise NotImplementedError(
-                        "Null-terminated binary strings are not supported as"
-                        " filter values."
-                    )
+        if check_null_strings:
+            for conjunction in filters:
+                for col, op, val in conjunction:
+                    if (
+                        isinstance(val, list)
+                        and all(_check_contains_null(v) for v in val)
+                        or _check_contains_null(val)
+                    ):
+                        raise NotImplementedError(
+                            "Null-terminated binary strings are not supported "
+                            "as filter values."
+                        )
     return filters
 
+
+_DNF_filter_doc = """Predicates are expressed in disjunctive normal form 
(DNF), like
+    ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary boolean logical
+    combinations of single column predicates. The innermost tuples each
+    describe a single column predicate. The list of inner predicates is
+    interpreted as a conjunction (AND), forming a more selective and
+    multiple column predicate. Finally, the most outer list combines these
+    filters as a disjunction (OR).
+
+    Predicates may also be passed as List[Tuple]. This form is interpreted
+    as a single conjunction. To express OR in predicates, one must
+    use the (preferred) List[List[Tuple]] notation."""
+
+
+def _filters_to_expression(filters):
+    """
+    Check if filters are well-formed.
+
+    See _DNF_filter_doc above for more details.
+    """
+    import pyarrow.dataset as ds
+
+    if isinstance(filters, ds.Expression):
+        return filters
+
+    filters = _check_filters(filters, check_null_strings=False)
+
+    def convert_single_predicate(col, op, val):
+        field = ds.field(col)
+
+        if op == "=" or op == "==":
+            return field == val
+        elif op == "!=":
+            return field != val
+        elif op == '<':
+            return field < val
+        elif op == '>':
+            return field > val
+        elif op == '<=':
+            return field <= val
+        elif op == '>=':
+            return field >= val
+        elif op == 'in':
+            return field.isin(val)
+        elif op == 'not in':
+            return ~field.isin(val)
+        else:
+            raise ValueError(
+                '"{0}" is not a valid operator in predicates.'.format(
+                    (col, op, val)))
+
+    or_exprs = []
+
+    for conjunction in filters:
+        and_exprs = []
+        for col, op, val in conjunction:
+            and_exprs.append(convert_single_predicate(col, op, val))
+
+        expr = and_exprs[0]
+        if len(and_exprs) > 1:
+            for and_expr in and_exprs[1:]:
+                expr = ds.AndExpression(expr, and_expr)
+
+        or_exprs.append(expr)
+
+    expr = or_exprs[0]
+    if len(or_exprs) > 1:
+        expr = ds.OrExpression(*or_exprs)
+        for or_expr in or_exprs[1:]:
+            expr = ds.OrExpression(expr, or_expr)
+
+    return expr
+
+
 # ----------------------------------------------------------------------
 # Reading a single Parquet file
 
@@ -979,7 +1054,18 @@ memory_map : bool, default False
     improve performance in some environments.
 buffer_size : int, default 0
     If positive, perform read buffering when deserializing individual
-    column chunks. Otherwise IO calls are unbuffered."""
+    column chunks. Otherwise IO calls are unbuffered.
+partitioning : Partitioning or str or list of str, default "hive"
+    The partitioning scheme for a partitioned dataset. The default of "hive"
+    assumes directory names with key=value pairs like "/year=2009/month=11".
+    In addition, a scheme like "/2009/11" is also supported, in which case
+    you need to specify the field names or a full schema. See the
+    ``pyarrow.dataset.partitioning()`` function for more details.
+use_legacy_dataset : bool, default True
+    Set to False to enable the new code path (experimental, using the
+    new Arrow Dataset API). Among other things, this allows to pass
+    `filters` for all columns and not only the partition keys, enables
+    different partitioning schemes, etc."""
 
 
 class ParquetDataset:
@@ -1005,31 +1091,51 @@ split_row_groups : bool, default False
 validate_schema : bool, default True
     Check that individual file schemas are all the same / compatible.
 filters : List[Tuple] or List[List[Tuple]] or None (default)
-    List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This
-    implements partition-level (hive) filtering only, i.e., to prevent the
-    loading of some files of the dataset.
-
-    Predicates are expressed in disjunctive normal form (DNF). This means
-    that the innermost tuple describe a single column predicate. These
-    inner predicate make are all combined with a conjunction (AND) into a
-    larger predicate. The most outer list then combines all filters
-    with a disjunction (OR). By this, we should be able to express all
-    kinds of filters that are possible using boolean logic.
-
-    This function also supports passing in as List[Tuple]. These predicates
-    are evaluated as a conjunction. To express OR in predicates, one must
-    use the (preferred) List[List[Tuple]] notation.
+    Rows which do not match the filter predicate will be removed from scanned
+    data. Partition keys embedded in a nested directory structure will be
+    exploited to avoid loading files at all if they contain no matching rows.
+    If `use_legacy_dataset` is True, filters can only reference partition
+    keys and only a hive-style directory structure is supported. When
+    setting `use_legacy_dataset` to False, also within-file level filtering
+    and different partitioning schemes are supported.
+
+    {1}
 metadata_nthreads: int, default 1
     How many threads to allow the thread pool which is used to read the
     dataset metadata. Increasing this is helpful to read partitioned
     datasets.
-{}
-""".format(_read_docstring_common)
+{0}
+""".format(_read_docstring_common, _DNF_filter_doc)
+
+    def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
+                metadata=None, split_row_groups=False, validate_schema=True,
+                filters=None, metadata_nthreads=1, read_dictionary=None,
+                memory_map=False, buffer_size=0, partitioning="hive",
+                use_legacy_dataset=True):
+        if not use_legacy_dataset:
+            return _ParquetDatasetV2(path_or_paths, filesystem=filesystem,
+                                     filters=filters,
+                                     partitioning=partitioning,
+                                     read_dictionary=read_dictionary,
+                                     memory_map=memory_map,
+                                     buffer_size=buffer_size,
+                                     # unsupported keywords
+                                     schema=schema, metadata=metadata,
+                                     split_row_groups=split_row_groups,
+                                     validate_schema=validate_schema,
+                                     metadata_nthreads=metadata_nthreads)
+        self = object.__new__(cls)
+        return self
 
     def __init__(self, path_or_paths, filesystem=None, schema=None,
                  metadata=None, split_row_groups=False, validate_schema=True,
                  filters=None, metadata_nthreads=1, read_dictionary=None,
-                 memory_map=False, buffer_size=0):
+                 memory_map=False, buffer_size=0, partitioning="hive",
+                 use_legacy_dataset=True):
+        if partitioning != "hive":
+            raise ValueError(
+                'Only "hive" for hive-like partitioning is supported when '
+                'using use_legacy_dataset=True')
         self._metadata = _ParquetDatasetMetadata()
         a_path = path_or_paths
         if isinstance(a_path, list):
@@ -1252,6 +1358,112 @@ def _make_manifest(path_or_paths, fs, pathsep='/', 
metadata_nthreads=1,
     return pieces, partitions, common_metadata_path, metadata_path
 
 
+class _ParquetDatasetV2:
+    """
+    ParquetDataset shim using the Dataset API under the hood.
+    """
+    def __init__(self, path_or_paths, filesystem=None, filters=None,
+                 partitioning="hive", read_dictionary=None, buffer_size=None,
+                 memory_map=False, **kwargs):
+        import pyarrow.dataset as ds
+        import pyarrow.fs
+
+        # Raise error for not supported keywords
+        for keyword, default in [
+                ("schema", None), ("metadata", None),
+                ("split_row_groups", False), ("validate_schema", True),
+                ("metadata_nthreads", 1)]:
+            if keyword in kwargs and kwargs[keyword] is not default:
+                raise ValueError(
+                    "Keyword '{0}' is not yet supported with the new "
+                    "Dataset API".format(keyword))
+
+        # map old filesystems to new one
+        # TODO(dataset) deal with other file systems
+        if isinstance(filesystem, LocalFileSystem):
+            filesystem = pyarrow.fs.LocalFileSystem(use_mmap=memory_map)
+        elif filesystem is None and memory_map:
+            # if memory_map is specified, assume local file system (string
+            # path can in principle be URI for any filesystem)
+            filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True)
+
+        # map additional arguments
+        read_options = {}
+        if buffer_size:
+            read_options.update(use_buffered_stream=True,
+                                buffer_size=buffer_size)
+        if read_dictionary is not None:
+            read_options.update(dictionary_columns=read_dictionary)
+        parquet_format = ds.ParquetFileFormat(read_options=read_options)
+
+        self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
+                                   format=parquet_format,
+                                   partitioning=partitioning)
+        self._filters = filters
+        if filters is not None:
+            self._filter_expression = _filters_to_expression(filters)
+        else:
+            self._filter_expression = None
+
+    @property
+    def schema(self):
+        return self._dataset.schema
+
+    def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
+        """
+        Read (multiple) Parquet files as a single pyarrow.Table.
+
+        Parameters
+        ----------
+        columns : List[str]
+            Names of columns to read from the dataset.
+        use_threads : bool, default True
+            Perform multi-threaded column reads.
+        use_pandas_metadata : bool, default False
+            If True and file has custom pandas schema metadata, ensure that
+            index columns are also loaded.
+
+        Returns
+        -------
+        pyarrow.Table
+            Content of the file as a table (of columns).
+        """
+        # if use_pandas_metadata, we need to include index columns in the
+        # column selection, to be able to restore those in the pandas DataFrame
+        metadata = self._dataset.schema.metadata
+        if columns is not None and use_pandas_metadata:
+            if metadata and b'pandas' in metadata:
+                index_columns = set(_get_pandas_index_columns(metadata))
+                columns = columns + list(index_columns - set(columns))
+
+        table = self._dataset.to_table(
+            columns=columns, filter=self._filter_expression,
+            use_threads=use_threads
+        )
+
+        # if use_pandas_metadata, restore the pandas metadata (which gets
+        # lost if doing a specific `columns` selection in to_table)
+        if use_pandas_metadata:
+            if metadata and b"pandas" in metadata:
+                new_metadata = table.schema.metadata or {}
+                new_metadata.update({b"pandas": metadata[b"pandas"]})
+                table = table.replace_schema_metadata(new_metadata)
+
+        return table
+
+    def read_pandas(self, **kwargs):
+        """
+        Read dataset including pandas metadata, if any. Other arguments passed
+        through to ParquetDataset.read, see docstring for further details.
+        """
+        return self.read(use_pandas_metadata=True, **kwargs)
+
+    @property
+    def pieces(self):
+        # TODO raise deprecation warning
+        return list(self._dataset.get_fragments())
+
+
 _read_table_docstring = """
 {0}
 
@@ -1271,10 +1483,15 @@ metadata : FileMetaData
     If separately computed
 {1}
 filters : List[Tuple] or List[List[Tuple]] or None (default)
-    List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This
-    implements partition-level (hive) filtering only, i.e., to prevent the
-    loading of some files of the dataset if `source` is a directory.
-    See the docstring of ParquetDataset for more details.
+    Rows which do not match the filter predicate will be removed from scanned
+    data. Partition keys embedded in a nested directory structure will be
+    exploited to avoid loading files at all if they contain no matching rows.
+    If `use_legacy_dataset` is True, filters can only reference partition
+    keys and only a hive-style directory structure is supported. When
+    setting `use_legacy_dataset` to False, also within-file level filtering
+    and different partitioning schemes are supported.
+
+    {3}
 
 Returns
 -------
@@ -1285,12 +1502,32 @@ Returns
 def read_table(source, columns=None, use_threads=True, metadata=None,
                use_pandas_metadata=False, memory_map=False,
                read_dictionary=None, filesystem=None, filters=None,
-               buffer_size=0):
+               buffer_size=0, partitioning="hive", use_legacy_dataset=True):
+    if not use_legacy_dataset:
+        if not _is_path_like(source):
+            raise ValueError("File-like objects are not yet supported with "
+                             "the new Dataset API")
+
+        dataset = _ParquetDatasetV2(
+            source,
+            filesystem=filesystem,
+            partitioning=partitioning,
+            memory_map=memory_map,
+            read_dictionary=read_dictionary,
+            buffer_size=buffer_size,
+            filters=filters,
+            # unsupported keywords
+            metadata=metadata
+        )
+        return dataset.read(columns=columns, use_threads=use_threads,
+                            use_pandas_metadata=use_pandas_metadata)
+
     if _is_path_like(source):
         pf = ParquetDataset(source, metadata=metadata, memory_map=memory_map,
                             read_dictionary=read_dictionary,
                             buffer_size=buffer_size,
-                            filesystem=filesystem, filters=filters)
+                            filesystem=filesystem, filters=filters,
+                            partitioning=partitioning)
     else:
         pf = ParquetFile(source, metadata=metadata,
                          read_dictionary=read_dictionary,
@@ -1307,11 +1544,13 @@ read_table.__doc__ = _read_table_docstring.format(
     If True and file has custom pandas schema metadata, ensure that
     index columns are also loaded""")),
     """pyarrow.Table
-    Content of the file as a table (of columns)""")
+    Content of the file as a table (of columns)""",
+    _DNF_filter_doc)
 
 
 def read_pandas(source, columns=None, use_threads=True, memory_map=False,
-                metadata=None, filters=None, buffer_size=0):
+                metadata=None, filters=None, buffer_size=0,
+                use_legacy_dataset=True):
     return read_table(
         source,
         columns=columns,
@@ -1321,6 +1560,7 @@ def read_pandas(source, columns=None, use_threads=True, 
memory_map=False,
         memory_map=memory_map,
         buffer_size=buffer_size,
         use_pandas_metadata=True,
+        use_legacy_dataset=use_legacy_dataset,
     )
 
 
@@ -1330,7 +1570,8 @@ read_pandas.__doc__ = _read_table_docstring.format(
     _read_docstring_common,
     """pyarrow.Table
     Content of the file as a Table of Columns, including DataFrame
-    indexes as columns""")
+    indexes as columns""",
+    _DNF_filter_doc)
 
 
 def write_table(table, where, row_group_size=None, version='1.0',
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 7e900f0..89131b4 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -56,6 +56,21 @@ def datadir(datadir):
     return datadir / 'parquet'
 
 
+parametrize_legacy_dataset = pytest.mark.parametrize(
+    "use_legacy_dataset",
+    [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_skip_buffer = pytest.mark.parametrize(
+    "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+
+
+def deterministic_row_order(use_legacy_dataset, chunk_size=None):
+    # TODO(datasets) ensure to use use_threads=False with the new dataset API
+    # in the tests because otherwise the row order is not deterministic
+    return False if not use_legacy_dataset and chunk_size is not None else True
+
+
 def _write_table(table, path, **kwargs):
     # So we see the ImportError somewhere
     import pyarrow.parquet as pq
@@ -72,18 +87,26 @@ def _read_table(*args, **kwargs):
 
 
 def _roundtrip_table(table, read_table_kwargs=None,
-                     write_table_kwargs=None):
+                     write_table_kwargs=None, use_legacy_dataset=True):
     read_table_kwargs = read_table_kwargs or {}
     write_table_kwargs = write_table_kwargs or {}
 
-    buf = io.BytesIO()
-    _write_table(table, buf, **write_table_kwargs)
-    buf.seek(0)
-    return _read_table(buf, **read_table_kwargs)
+    if use_legacy_dataset:
+        buf = io.BytesIO()
+        _write_table(table, buf, **write_table_kwargs)
+        buf.seek(0)
+        return _read_table(buf, **read_table_kwargs)
+    else:
+        from pyarrow.fs import _MockFileSystem
+        mockfs = _MockFileSystem()
+        with mockfs.open_output_stream("test") as out:
+            _write_table(table, out, **write_table_kwargs)
+        return _read_table("test", filesystem=mockfs, use_legacy_dataset=False,
+                           **read_table_kwargs)
 
 
 def _check_roundtrip(table, expected=None, read_table_kwargs=None,
-                     **write_table_kwargs):
+                     use_legacy_dataset=True, **write_table_kwargs):
     if expected is None:
         expected = table
 
@@ -91,31 +114,31 @@ def _check_roundtrip(table, expected=None, 
read_table_kwargs=None,
 
     # intentionally check twice
     result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
-                              write_table_kwargs=write_table_kwargs)
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
     assert result.equals(expected)
     result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
-                              write_table_kwargs=write_table_kwargs)
+                              write_table_kwargs=write_table_kwargs,
+                              use_legacy_dataset=use_legacy_dataset)
     assert result.equals(expected)
 
 
-def _roundtrip_pandas_dataframe(df, write_kwargs):
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
     table = pa.Table.from_pandas(df)
-
-    buf = io.BytesIO()
-    _write_table(table, buf, **write_kwargs)
-
-    buf.seek(0)
-    table1 = _read_table(buf)
-    return table1.to_pandas()
+    result = _roundtrip_table(
+        table, write_table_kwargs=write_kwargs,
+        use_legacy_dataset=use_legacy_dataset)
+    return result.to_pandas()
 
 
+@parametrize_legacy_dataset
 @pytest.mark.parametrize('dtype', [int, float])
-def test_single_pylist_column_roundtrip(tempdir, dtype):
+def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
     filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
     data = [pa.array(list(map(dtype, range(5))))]
     table = pa.Table.from_arrays(data, names=['a'])
     _write_table(table, filename)
-    table_read = _read_table(filename)
+    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
     for i in range(table.num_columns):
         col_written = table[i]
         col_read = table_read[i]
@@ -156,8 +179,9 @@ def alltypes_sample(size=10000, seed=0, categorical=False):
 
 
 @pytest.mark.pandas
+@parametrize_legacy_dataset
 @pytest.mark.parametrize('chunk_size', [None, 1000])
-def test_pandas_parquet_2_0_roundtrip(tempdir, chunk_size):
+def test_pandas_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset):
     df = alltypes_sample(size=10000, categorical=True)
 
     filename = tempdir / 'pandas_roundtrip.parquet'
@@ -166,10 +190,17 @@ def test_pandas_parquet_2_0_roundtrip(tempdir, 
chunk_size):
 
     _write_table(arrow_table, filename, version="2.0",
                  coerce_timestamps='ms', chunk_size=chunk_size)
-    table_read = pq.read_pandas(filename)
+    use_threads = deterministic_row_order(use_legacy_dataset, chunk_size)
+    table_read = pq.read_pandas(
+        filename, use_legacy_dataset=use_legacy_dataset,
+        use_threads=use_threads)
     assert table_read.schema.pandas_metadata is not None
 
-    assert arrow_table.schema.metadata == table_read.schema.metadata
+    read_metadata = table_read.schema.metadata
+    if not use_legacy_dataset:
+        read_metadata.pop(b"ARROW:schema")
+
+    assert arrow_table.schema.metadata == read_metadata
 
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
@@ -181,74 +212,85 @@ def test_parquet_invalid_version(tempdir):
         _write_table(table, tempdir / 'test_version.parquet', version="2.2")
 
 
-def test_set_data_page_size():
+@parametrize_legacy_dataset
+def test_set_data_page_size(use_legacy_dataset):
     arr = pa.array([1, 2, 3] * 100000)
     t = pa.Table.from_arrays([arr], names=['f0'])
 
     # 128K, 512K
     page_sizes = [2 << 16, 2 << 18]
     for target_page_size in page_sizes:
-        _check_roundtrip(t, data_page_size=target_page_size)
+        _check_roundtrip(t, data_page_size=target_page_size,
+                         use_legacy_dataset=use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_chunked_table_write():
+@parametrize_legacy_dataset
+def test_chunked_table_write(use_legacy_dataset):
     # ARROW-232
     df = alltypes_sample(size=10)
 
     batch = pa.RecordBatch.from_pandas(df)
     table = pa.Table.from_batches([batch] * 3)
-    _check_roundtrip(table, version='2.0')
+    _check_roundtrip(
+        table, version='2.0', use_legacy_dataset=use_legacy_dataset)
 
     df, _ = dataframe_with_lists()
     batch = pa.RecordBatch.from_pandas(df)
     table = pa.Table.from_batches([batch] * 3)
-    _check_roundtrip(table, version='2.0')
+    _check_roundtrip(
+        table, version='2.0', use_legacy_dataset=use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_memory_map(tempdir):
+@parametrize_legacy_dataset
+def test_memory_map(tempdir, use_legacy_dataset):
     df = alltypes_sample(size=10)
 
     table = pa.Table.from_pandas(df)
     _check_roundtrip(table, read_table_kwargs={'memory_map': True},
-                     version='2.0')
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
 
     filename = str(tempdir / 'tmp_file')
     with open(filename, 'wb') as f:
         _write_table(table, f, version='2.0')
-    table_read = pq.read_pandas(filename, memory_map=True)
+    table_read = pq.read_pandas(filename, memory_map=True,
+                                use_legacy_dataset=use_legacy_dataset)
     assert table_read.equals(table)
 
 
 @pytest.mark.pandas
-def test_enable_buffered_stream(tempdir):
+@parametrize_legacy_dataset
+def test_enable_buffered_stream(tempdir, use_legacy_dataset):
     df = alltypes_sample(size=10)
 
     table = pa.Table.from_pandas(df)
     _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
-                     version='2.0')
+                     version='2.0', use_legacy_dataset=use_legacy_dataset)
 
     filename = str(tempdir / 'tmp_file')
     with open(filename, 'wb') as f:
         _write_table(table, f, version='2.0')
-    table_read = pq.read_pandas(filename, buffer_size=4096)
+    table_read = pq.read_pandas(filename, buffer_size=4096,
+                                use_legacy_dataset=use_legacy_dataset)
     assert table_read.equals(table)
 
 
-def test_special_chars_filename(tempdir):
+@parametrize_legacy_dataset
+def test_special_chars_filename(tempdir, use_legacy_dataset):
     table = pa.Table.from_arrays([pa.array([42])], ["ints"])
     filename = "foo # bar"
     path = tempdir / filename
     assert not path.exists()
     _write_table(table, str(path))
     assert path.exists()
-    table_read = _read_table(str(path))
+    table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
     assert table_read.equals(table)
 
 
 @pytest.mark.pandas
-def test_empty_table_roundtrip():
+@parametrize_legacy_dataset
+def test_empty_table_roundtrip(use_legacy_dataset):
     df = alltypes_sample(size=10)
 
     # Create a non-empty table to infer the types correctly, then slice to 0
@@ -259,24 +301,28 @@ def test_empty_table_roundtrip():
 
     assert table.schema.field('null').type == pa.null()
     assert table.schema.field('null_list').type == pa.list_(pa.null())
-    _check_roundtrip(table, version='2.0')
+    _check_roundtrip(
+        table, version='2.0', use_legacy_dataset=use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_empty_table_no_columns():
+@parametrize_legacy_dataset
+def test_empty_table_no_columns(use_legacy_dataset):
     df = pd.DataFrame()
     empty = pa.Table.from_pandas(df, preserve_index=False)
-    _check_roundtrip(empty)
+    _check_roundtrip(empty, use_legacy_dataset=use_legacy_dataset)
 
 
-def test_empty_lists_table_roundtrip():
+@parametrize_legacy_dataset
+def test_empty_lists_table_roundtrip(use_legacy_dataset):
     # ARROW-2744: Shouldn't crash when writing an array of empty lists
     arr = pa.array([[], []], type=pa.list_(pa.int32()))
     table = pa.Table.from_arrays([arr], ["A"])
-    _check_roundtrip(table)
+    _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset)
 
 
-def test_nested_list_nonnullable_roundtrip_bug():
+@parametrize_legacy_dataset
+def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
     # Reproduce failure in ARROW-5630
     typ = pa.list_(pa.field("item", pa.float32(), False))
     num_rows = 10000
@@ -284,11 +330,13 @@ def test_nested_list_nonnullable_roundtrip_bug():
         pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)]
                   * (num_rows // 10)), type=typ)
     ], ['a'])
-    _check_roundtrip(t, data_page_size=4096)
+    _check_roundtrip(
+        t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_pandas_parquet_datetime_tz():
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_datetime_tz(use_legacy_dataset):
     s = pd.Series([datetime.datetime(2017, 9, 6)])
     s = s.dt.tz_localize('utc')
 
@@ -313,12 +361,14 @@ def test_pandas_parquet_datetime_tz():
 
 
 @pytest.mark.pandas
-def test_datetime_timezone_tzinfo():
+@parametrize_legacy_dataset
+def test_datetime_timezone_tzinfo(use_legacy_dataset):
     value = datetime.datetime(2018, 1, 1, 1, 23, 45,
                               tzinfo=datetime.timezone.utc)
     df = pd.DataFrame({'foo': [value]})
 
-    _roundtrip_pandas_dataframe(df, write_kwargs={})
+    _roundtrip_pandas_dataframe(
+        df, write_kwargs={}, use_legacy_dataset=use_legacy_dataset)
 
 
 @pytest.mark.pandas
@@ -342,7 +392,8 @@ def test_pandas_parquet_custom_metadata(tempdir):
 
 
 @pytest.mark.pandas
-def test_pandas_parquet_column_multiindex(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_column_multiindex(tempdir, use_legacy_dataset):
     df = alltypes_sample(size=10)
     df.columns = pd.MultiIndex.from_tuples(
         list(zip(df.columns, df.columns[::-1])),
@@ -355,13 +406,17 @@ def test_pandas_parquet_column_multiindex(tempdir):
 
     _write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
 
-    table_read = pq.read_pandas(filename)
+    table_read = pq.read_pandas(
+        filename, use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
 
 
 @pytest.mark.pandas
-def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(
+    tempdir, use_legacy_dataset
+):
     df = alltypes_sample(size=10000)
 
     filename = tempdir / 'pandas_roundtrip.parquet'
@@ -373,19 +428,25 @@ def 
test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir):
     assert js['columns']
 
     _write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
-    table_read = pq.read_pandas(filename)
+    table_read = pq.read_pandas(
+        filename, use_legacy_dataset=use_legacy_dataset)
 
     js = table_read.schema.pandas_metadata
     assert not js['index_columns']
 
-    assert arrow_table.schema.metadata == table_read.schema.metadata
+    read_metadata = table_read.schema.metadata
+    if not use_legacy_dataset:
+        read_metadata.pop(b"ARROW:schema")
+
+    assert arrow_table.schema.metadata == read_metadata
 
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
 
 
 @pytest.mark.pandas
-def test_pandas_parquet_1_0_roundtrip(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_1_0_roundtrip(tempdir, use_legacy_dataset):
     size = 10000
     np.random.seed(0)
     df = pd.DataFrame({
@@ -407,7 +468,7 @@ def test_pandas_parquet_1_0_roundtrip(tempdir):
     filename = tempdir / 'pandas_roundtrip.parquet'
     arrow_table = pa.Table.from_pandas(df)
     _write_table(arrow_table, filename, version='1.0')
-    table_read = _read_table(filename)
+    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
 
     # We pass uint32_t as int64_t if we write Parquet version 1.0
@@ -417,12 +478,13 @@ def test_pandas_parquet_1_0_roundtrip(tempdir):
 
 
 @pytest.mark.pandas
-def test_multiple_path_types(tempdir):
+@parametrize_legacy_dataset
+def test_multiple_path_types(tempdir, use_legacy_dataset):
     # Test compatibility with PEP 519 path-like objects
     path = tempdir / 'zzz.parquet'
     df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
     _write_table(df, path)
-    table_read = _read_table(path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
 
@@ -430,13 +492,15 @@ def test_multiple_path_types(tempdir):
     path = str(tempdir) + 'zzz.parquet'
     df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
     _write_table(df, path)
-    table_read = _read_table(path)
+    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
 
 
+# TODO(dataset) duplicate column selection actually gives duplicate columns now
 @pytest.mark.pandas
-def test_pandas_column_selection(tempdir):
+@parametrize_legacy_dataset_not_supported
+def test_pandas_column_selection(tempdir, use_legacy_dataset):
     size = 10000
     np.random.seed(0)
     df = pd.DataFrame({
@@ -446,14 +510,17 @@ def test_pandas_column_selection(tempdir):
     filename = tempdir / 'pandas_roundtrip.parquet'
     arrow_table = pa.Table.from_pandas(df)
     _write_table(arrow_table, filename)
-    table_read = _read_table(filename, columns=['uint8'])
+    table_read = _read_table(
+        filename, columns=['uint8'], use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
 
     tm.assert_frame_equal(df[['uint8']], df_read)
 
     # ARROW-4267: Selection of duplicate columns still leads to these columns
     # being read uniquely.
-    table_read = _read_table(filename, columns=['uint8', 'uint8'])
+    table_read = _read_table(
+        filename, columns=['uint8', 'uint8'],
+        use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
 
     tm.assert_frame_equal(df[['uint8']], df_read)
@@ -491,20 +558,24 @@ def _test_dataframe(size=10000, seed=0):
     return df
 
 
+# TODO(ARROW-8074) NativeFile support
 @pytest.mark.pandas
-def test_pandas_parquet_native_file_roundtrip(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_native_file_roundtrip(tempdir, use_legacy_dataset):
     df = _test_dataframe(10000)
     arrow_table = pa.Table.from_pandas(df)
     imos = pa.BufferOutputStream()
     _write_table(arrow_table, imos, version="2.0")
     buf = imos.getvalue()
     reader = pa.BufferReader(buf)
-    df_read = _read_table(reader).to_pandas()
+    df_read = _read_table(
+        reader, use_legacy_dataset=use_legacy_dataset).to_pandas()
     tm.assert_frame_equal(df, df_read)
 
 
 @pytest.mark.pandas
-def test_parquet_incremental_file_build(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
     df = _test_dataframe(100)
     df['unique_id'] = 0
 
@@ -524,33 +595,40 @@ def test_parquet_incremental_file_build(tempdir):
     writer.close()
 
     buf = out.getvalue()
-    result = _read_table(pa.BufferReader(buf))
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
 
     expected = pd.concat(frames, ignore_index=True)
     tm.assert_frame_equal(result.to_pandas(), expected)
 
 
 @pytest.mark.pandas
-def test_read_pandas_column_subset(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_read_pandas_column_subset(tempdir, use_legacy_dataset):
     df = _test_dataframe(10000)
     arrow_table = pa.Table.from_pandas(df)
     imos = pa.BufferOutputStream()
     _write_table(arrow_table, imos, version="2.0")
     buf = imos.getvalue()
     reader = pa.BufferReader(buf)
-    df_read = pq.read_pandas(reader, columns=['strings', 'uint8']).to_pandas()
+    df_read = pq.read_pandas(
+        reader, columns=['strings', 'uint8'],
+        use_legacy_dataset=use_legacy_dataset
+    ).to_pandas()
     tm.assert_frame_equal(df[['strings', 'uint8']], df_read)
 
 
 @pytest.mark.pandas
-def test_pandas_parquet_empty_roundtrip(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_empty_roundtrip(tempdir, use_legacy_dataset):
     df = _test_dataframe(0)
     arrow_table = pa.Table.from_pandas(df)
     imos = pa.BufferOutputStream()
     _write_table(arrow_table, imos, version="2.0")
     buf = imos.getvalue()
     reader = pa.BufferReader(buf)
-    df_read = _read_table(reader).to_pandas()
+    df_read = _read_table(
+        reader, use_legacy_dataset=use_legacy_dataset).to_pandas()
     tm.assert_frame_equal(df, df_read)
 
 
@@ -580,7 +658,8 @@ def test_pandas_can_write_nested_data(tempdir):
 
 
 @pytest.mark.pandas
-def test_pandas_parquet_pyfile_roundtrip(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset):
     filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
     size = 5
     df = pd.DataFrame({
@@ -598,13 +677,14 @@ def test_pandas_parquet_pyfile_roundtrip(tempdir):
 
     data = io.BytesIO(filename.read_bytes())
 
-    table_read = _read_table(data)
+    table_read = _read_table(data, use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
 
 
 @pytest.mark.pandas
-def test_pandas_parquet_configuration_options(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
     size = 10000
     np.random.seed(0)
     df = pd.DataFrame({
@@ -626,14 +706,16 @@ def test_pandas_parquet_configuration_options(tempdir):
     for use_dictionary in [True, False]:
         _write_table(arrow_table, filename, version='2.0',
                      use_dictionary=use_dictionary)
-        table_read = _read_table(filename)
+        table_read = _read_table(
+            filename, use_legacy_dataset=use_legacy_dataset)
         df_read = table_read.to_pandas()
         tm.assert_frame_equal(df, df_read)
 
     for write_statistics in [True, False]:
         _write_table(arrow_table, filename, version='2.0',
                      write_statistics=write_statistics)
-        table_read = _read_table(filename)
+        table_read = _read_table(filename,
+                                 use_legacy_dataset=use_legacy_dataset)
         df_read = table_read.to_pandas()
         tm.assert_frame_equal(df, df_read)
 
@@ -643,7 +725,8 @@ def test_pandas_parquet_configuration_options(tempdir):
             continue
         _write_table(arrow_table, filename, version='2.0',
                      compression=compression)
-        table_read = _read_table(filename)
+        table_read = _read_table(
+            filename, use_legacy_dataset=use_legacy_dataset)
         df_read = table_read.to_pandas()
         tm.assert_frame_equal(df, df_read)
 
@@ -662,7 +745,8 @@ def make_sample_file(table_or_df):
     return pq.ParquetFile(buf)
 
 
-def test_byte_stream_split():
+@parametrize_legacy_dataset
+def test_byte_stream_split(use_legacy_dataset):
     # This is only a smoke test.
     arr_float = pa.array(list(map(float, range(100))))
     arr_int = pa.array(list(map(int, range(100))))
@@ -696,26 +780,31 @@ def test_byte_stream_split():
     table = pa.Table.from_arrays([arr_int], names=['tmp'])
     with pytest.raises(IOError):
         _check_roundtrip(table, expected=table, use_byte_stream_split=True,
-                         use_dictionary=False)
+                         use_dictionary=False,
+                         use_legacy_dataset=use_legacy_dataset)
 
 
-def test_compression_level():
+@parametrize_legacy_dataset
+def test_compression_level(use_legacy_dataset):
     arr = pa.array(list(map(int, range(1000))))
     data = [arr, arr]
     table = pa.Table.from_arrays(data, names=['a', 'b'])
 
     # Check one compression level.
     _check_roundtrip(table, expected=table, compression="gzip",
-                     compression_level=1)
+                     compression_level=1,
+                     use_legacy_dataset=use_legacy_dataset)
 
     # Check another one to make sure that compression_level=1 does not
     # coincide with the default one in Arrow.
     _check_roundtrip(table, expected=table, compression="gzip",
-                     compression_level=5)
+                     compression_level=5,
+                     use_legacy_dataset=use_legacy_dataset)
 
     # Check that the user can provide a compression level per column
     _check_roundtrip(table, expected=table, compression="gzip",
-                     compression_level={'a': 2, 'b': 3})
+                     compression_level={'a': 2, 'b': 3},
+                     use_legacy_dataset=use_legacy_dataset)
 
     # Check that specifying a compression level for a codec which does allow
     # specifying one, results into an error.
@@ -1356,7 +1445,8 @@ def test_fixed_size_binary():
 
 
 @pytest.mark.pandas
-def test_multithreaded_read():
+@parametrize_legacy_dataset_skip_buffer
+def test_multithreaded_read(use_legacy_dataset):
     df = alltypes_sample(size=10000)
 
     table = pa.Table.from_pandas(df)
@@ -1365,16 +1455,19 @@ def test_multithreaded_read():
     _write_table(table, buf, compression='SNAPPY', version='2.0')
 
     buf.seek(0)
-    table1 = _read_table(buf, use_threads=True)
+    table1 = _read_table(
+        buf, use_threads=True, use_legacy_dataset=use_legacy_dataset)
 
     buf.seek(0)
-    table2 = _read_table(buf, use_threads=False)
+    table2 = _read_table(
+        buf, use_threads=False, use_legacy_dataset=use_legacy_dataset)
 
     assert table1.equals(table2)
 
 
 @pytest.mark.pandas
-def test_min_chunksize():
+@parametrize_legacy_dataset_skip_buffer
+def test_min_chunksize(use_legacy_dataset):
     data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
     table = pa.Table.from_pandas(data.reset_index())
 
@@ -1382,7 +1475,7 @@ def test_min_chunksize():
     _write_table(table, buf, chunk_size=-1)
 
     buf.seek(0)
-    result = _read_table(buf)
+    result = _read_table(buf, use_legacy_dataset=use_legacy_dataset)
 
     assert result.equals(table)
 
@@ -1581,9 +1674,10 @@ def test_partition_set_dictionary_type():
 
 
 @pytest.mark.pandas
-def test_read_partitioned_directory(tempdir):
+@parametrize_legacy_dataset
+def test_read_partitioned_directory(tempdir, use_legacy_dataset):
     fs = LocalFileSystem.get_instance()
-    _partition_test_for_filesystem(fs, tempdir)
+    _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset)
 
 
 @pytest.mark.pandas
@@ -1604,7 +1698,8 @@ def test_create_parquet_dataset_multi_threaded(tempdir):
 
 
 @pytest.mark.pandas
-def test_equivalency(tempdir):
+@parametrize_legacy_dataset
+def test_equivalency(tempdir, use_legacy_dataset):
     fs = LocalFileSystem.get_instance()
     base_path = tempdir
 
@@ -1631,7 +1726,8 @@ def test_equivalency(tempdir):
     dataset = pq.ParquetDataset(
         base_path, filesystem=fs,
         filters=[('integer', '=', 1), ('string', '!=', 'b'),
-                 ('boolean', '==', True)]
+                 ('boolean', '==', True)],
+        use_legacy_dataset=use_legacy_dataset,
     )
     table = dataset.read()
     result_df = (table.to_pandas().reset_index(drop=True))
@@ -1652,7 +1748,9 @@ def test_equivalency(tempdir):
         ],
         [('integer', '=', 0), ('boolean', '==', 'False')]
     ]
-    dataset = pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs, filters=filters,
+        use_legacy_dataset=use_legacy_dataset)
     table = dataset.read()
     result_df = table.to_pandas().reset_index(drop=True)
 
@@ -1668,19 +1766,28 @@ def test_equivalency(tempdir):
     assert df_filter_2.sum() > 0
     assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
 
-    # Check for \0 in predicate values. Until they are correctly implemented
-    # in ARROW-3391, they would otherwise lead to weird results with the
-    # current code.
-    with pytest.raises(NotImplementedError):
-        filters = [[('string', '==', b'1\0a')]]
-        pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
-    with pytest.raises(NotImplementedError):
-        filters = [[('string', '==', '1\0a')]]
-        pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+    if use_legacy_dataset:
+        # Check for \0 in predicate values. Until they are correctly
+        # implemented in ARROW-3391, they would otherwise lead to weird
+        # results with the current code.
+        with pytest.raises(NotImplementedError):
+            filters = [[('string', '==', b'1\0a')]]
+            pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+        with pytest.raises(NotImplementedError):
+            filters = [[('string', '==', '1\0a')]]
+            pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+    else:
+        for filters in [[[('string', '==', b'1\0a')]],
+                        [[('string', '==', '1\0a')]]]:
+            dataset = pq.ParquetDataset(
+                base_path, filesystem=fs, filters=filters,
+                use_legacy_dataset=False)
+            assert dataset.read().num_rows == 0
 
 
 @pytest.mark.pandas
-def test_cutoff_exclusive_integer(tempdir):
+@parametrize_legacy_dataset
+def test_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
     fs = LocalFileSystem.get_instance()
     base_path = tempdir
 
@@ -1702,7 +1809,8 @@ def test_cutoff_exclusive_integer(tempdir):
         filters=[
             ('integers', '<', 4),
             ('integers', '>', 1),
-        ]
+        ],
+        use_legacy_dataset=use_legacy_dataset
     )
     table = dataset.read()
     result_df = (table.to_pandas()
@@ -1714,11 +1822,14 @@ def test_cutoff_exclusive_integer(tempdir):
 
 
 @pytest.mark.pandas
+@parametrize_legacy_dataset
 @pytest.mark.xfail(
-    raises=TypeError,
+    # different error with use_legacy_datasets because result_df is no longer
+    # categorical
+    raises=(TypeError, AssertionError),
     reason='Loss of type information in creation of categoricals.'
 )
-def test_cutoff_exclusive_datetime(tempdir):
+def test_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
     fs = LocalFileSystem.get_instance()
     base_path = tempdir
 
@@ -1746,7 +1857,8 @@ def test_cutoff_exclusive_datetime(tempdir):
         filters=[
             ('dates', '<', "2018-04-12"),
             ('dates', '>', "2018-04-10")
-        ]
+        ],
+        use_legacy_dataset=use_legacy_dataset
     )
     table = dataset.read()
     result_df = (table.to_pandas()
@@ -1761,7 +1873,8 @@ def test_cutoff_exclusive_datetime(tempdir):
 
 
 @pytest.mark.pandas
-def test_inclusive_integer(tempdir):
+@parametrize_legacy_dataset
+def test_inclusive_integer(tempdir, use_legacy_dataset):
     fs = LocalFileSystem.get_instance()
     base_path = tempdir
 
@@ -1783,7 +1896,8 @@ def test_inclusive_integer(tempdir):
         filters=[
             ('integers', '<=', 3),
             ('integers', '>=', 2),
-        ]
+        ],
+        use_legacy_dataset=use_legacy_dataset
     )
     table = dataset.read()
     result_df = (table.to_pandas()
@@ -1795,7 +1909,8 @@ def test_inclusive_integer(tempdir):
 
 
 @pytest.mark.pandas
-def test_inclusive_set(tempdir):
+@parametrize_legacy_dataset
+def test_inclusive_set(tempdir, use_legacy_dataset):
     fs = LocalFileSystem.get_instance()
     base_path = tempdir
 
@@ -1820,7 +1935,8 @@ def test_inclusive_set(tempdir):
     dataset = pq.ParquetDataset(
         base_path, filesystem=fs,
         filters=[('integer', 'in', {1}), ('string', 'in', {'a', 'b'}),
-                 ('boolean', 'in', {True})]
+                 ('boolean', 'in', {True})],
+        use_legacy_dataset=use_legacy_dataset
     )
     table = dataset.read()
     result_df = (table.to_pandas().reset_index(drop=True))
@@ -1831,7 +1947,8 @@ def test_inclusive_set(tempdir):
 
 
 @pytest.mark.pandas
-def test_invalid_pred_op(tempdir):
+@parametrize_legacy_dataset
+def test_invalid_pred_op(tempdir, use_legacy_dataset):
     fs = LocalFileSystem.get_instance()
     base_path = tempdir
 
@@ -1851,23 +1968,29 @@ def test_invalid_pred_op(tempdir):
     with pytest.raises(ValueError):
         pq.ParquetDataset(base_path,
                           filesystem=fs,
-                          filters=[
-                            ('integers', '=<', 3),
-                          ])
-
-    with pytest.raises(ValueError):
-        pq.ParquetDataset(base_path,
-                          filesystem=fs,
-                          filters=[
-                            ('integers', 'in', set()),
-                          ])
+                          filters=[('integers', '=<', 3), ],
+                          use_legacy_dataset=use_legacy_dataset)
 
-    with pytest.raises(ValueError):
+    if use_legacy_dataset:
+        with pytest.raises(ValueError):
+            pq.ParquetDataset(base_path,
+                              filesystem=fs,
+                              filters=[('integers', 'in', set()), ],
+                              use_legacy_dataset=use_legacy_dataset)
+    else:
+        # Dataset API returns empty table instead
+        dataset = pq.ParquetDataset(base_path,
+                                    filesystem=fs,
+                                    filters=[('integers', 'in', set()), ],
+                                    use_legacy_dataset=use_legacy_dataset)
+        assert dataset.read().num_rows == 0
+
+    with pytest.raises(ValueError if use_legacy_dataset else TypeError):
+        # dataset API returns TypeError when trying create invalid comparison
         pq.ParquetDataset(base_path,
                           filesystem=fs,
-                          filters=[
-                            ('integers', '!=', {3}),
-                          ])
+                          filters=[('integers', '!=', {3})],
+                          use_legacy_dataset=use_legacy_dataset)
 
 
 @pytest.mark.pandas
@@ -1956,7 +2079,7 @@ def test_read_partitioned_directory_s3fs(s3_example):
     dataset.read()
 
 
-def _partition_test_for_filesystem(fs, base_path):
+def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True):
     foo_keys = [0, 1]
     bar_keys = ['a', 'b', 'c']
     partition_spec = [
@@ -1974,7 +2097,8 @@ def _partition_test_for_filesystem(fs, base_path):
 
     _generate_partition_directories(fs, base_path, partition_spec, df)
 
-    dataset = pq.ParquetDataset(base_path, filesystem=fs)
+    dataset = pq.ParquetDataset(
+        base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset)
     table = dataset.read()
     result_df = (table.to_pandas()
                  .sort_values(by='index')
@@ -1983,8 +2107,11 @@ def _partition_test_for_filesystem(fs, base_path):
     expected_df = (df.sort_values(by='index')
                    .reset_index(drop=True)
                    .reindex(columns=result_df.columns))
-    expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
-    expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
+    if use_legacy_dataset:
+        # TODO(dataset) Dataset API does not create categorical columns
+        # for partition keys
+        expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
+        expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
 
     assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
 
@@ -2133,7 +2260,8 @@ def _filter_partition(df, part_keys):
 
 
 @pytest.mark.pandas
-def test_read_multiple_files(tempdir):
+@parametrize_legacy_dataset
+def test_read_multiple_files(tempdir, use_legacy_dataset):
     nfiles = 10
     size = 5
 
@@ -2159,8 +2287,11 @@ def test_read_multiple_files(tempdir):
     # Write a _SUCCESS.crc file
     (dirpath / '_SUCCESS.crc').touch()
 
-    def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
-        dataset = pq.ParquetDataset(paths, **kwargs)
+    # TODO(datasets) changed to use_threads=False because otherwise the
+    # row order is not deterministic
+    def read_multiple_files(paths, columns=None, use_threads=False, **kwargs):
+        dataset = pq.ParquetDataset(
+            paths, use_legacy_dataset=use_legacy_dataset, **kwargs)
         return dataset.read(columns=columns, use_threads=use_threads)
 
     result = read_multiple_files(paths)
@@ -2169,13 +2300,15 @@ def test_read_multiple_files(tempdir):
     assert result.equals(expected)
 
     # Read with provided metadata
-    metadata = pq.read_metadata(paths[0])
+    # TODO(dataset) specifying metadata not yet supported
+    if use_legacy_dataset:
+        metadata = pq.read_metadata(paths[0])
 
-    result2 = read_multiple_files(paths, metadata=metadata)
-    assert result2.equals(expected)
+        result2 = read_multiple_files(paths, metadata=metadata)
+        assert result2.equals(expected)
 
-    result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
-    assert result3.equals(expected)
+        result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
+        assert result3.equals(expected)
 
     # Read column subset
     to_read = [0, 2, 6, result.num_columns - 1]
@@ -2197,6 +2330,10 @@ def test_read_multiple_files(tempdir):
     t = pa.Table.from_pandas(bad_apple)
     _write_table(t, bad_apple_path)
 
+    if not use_legacy_dataset:
+        # TODO(dataset) Dataset API skips bad files
+        return
+
     bad_meta = pq.read_metadata(bad_apple_path)
 
     with pytest.raises(ValueError):
@@ -2215,7 +2352,8 @@ def test_read_multiple_files(tempdir):
 
 
 @pytest.mark.pandas
-def test_dataset_read_pandas(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_read_pandas(tempdir, use_legacy_dataset):
     nfiles = 5
     size = 5
 
@@ -2238,7 +2376,7 @@ def test_dataset_read_pandas(tempdir):
         frames.append(df)
         paths.append(path)
 
-    dataset = pq.ParquetDataset(dirpath)
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
     columns = ['uint8', 'strings']
     result = dataset.read_pandas(columns=columns).to_pandas()
     expected = pd.concat([x[columns] for x in frames])
@@ -2247,7 +2385,8 @@ def test_dataset_read_pandas(tempdir):
 
 
 @pytest.mark.pandas
-def test_dataset_memory_map(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_memory_map(tempdir, use_legacy_dataset):
     # ARROW-2627: Check that we can use ParquetDataset with memory-mapping
     dirpath = tempdir / guid()
     dirpath.mkdir()
@@ -2257,12 +2396,16 @@ def test_dataset_memory_map(tempdir):
     table = pa.Table.from_pandas(df)
     _write_table(table, path, version='2.0')
 
-    dataset = pq.ParquetDataset(dirpath, memory_map=True)
-    assert dataset.pieces[0].read().equals(table)
+    dataset = pq.ParquetDataset(
+        dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset)
+    assert dataset.read().equals(table)
+    if use_legacy_dataset:
+        assert dataset.pieces[0].read().equals(table)
 
 
 @pytest.mark.pandas
-def test_dataset_enable_buffered_stream(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
     dirpath = tempdir / guid()
     dirpath.mkdir()
 
@@ -2272,11 +2415,15 @@ def test_dataset_enable_buffered_stream(tempdir):
     _write_table(table, path, version='2.0')
 
     with pytest.raises(ValueError):
-        pq.ParquetDataset(dirpath, buffer_size=-64)
+        pq.ParquetDataset(
+            dirpath, buffer_size=-64,
+            use_legacy_dataset=use_legacy_dataset)
 
     for buffer_size in [128, 1024]:
-        dataset = pq.ParquetDataset(dirpath, buffer_size=buffer_size)
-        assert dataset.pieces[0].read().equals(table)
+        dataset = pq.ParquetDataset(
+            dirpath, buffer_size=buffer_size,
+            use_legacy_dataset=use_legacy_dataset)
+        assert dataset.read().equals(table)
 
 
 @pytest.mark.pandas
@@ -2336,9 +2483,18 @@ def _make_example_multifile_dataset(base_path, 
nfiles=10, file_nrows=5):
     return paths
 
 
+def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
+    if use_legacy_dataset:
+        assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+    else:
+        paths = [str(path.as_posix()) for path in paths]
+        assert set(paths) == set(dataset._dataset.files)
+
+
 @pytest.mark.pandas
+@parametrize_legacy_dataset
 @pytest.mark.parametrize('dir_prefix', ['_', '.'])
-def test_ignore_private_directories(tempdir, dir_prefix):
+def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset):
     dirpath = tempdir / guid()
     dirpath.mkdir()
 
@@ -2348,12 +2504,14 @@ def test_ignore_private_directories(tempdir, 
dir_prefix):
     # private directory
     (dirpath / '{}staging'.format(dir_prefix)).mkdir()
 
-    dataset = pq.ParquetDataset(dirpath)
-    assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_ignore_hidden_files_dot(tempdir):
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset):
     dirpath = tempdir / guid()
     dirpath.mkdir()
 
@@ -2366,12 +2524,14 @@ def test_ignore_hidden_files_dot(tempdir):
     with (dirpath / '.private').open('wb') as f:
         f.write(b'gibberish')
 
-    dataset = pq.ParquetDataset(dirpath)
-    assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_ignore_hidden_files_underscore(tempdir):
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset):
     dirpath = tempdir / guid()
     dirpath.mkdir()
 
@@ -2384,12 +2544,14 @@ def test_ignore_hidden_files_underscore(tempdir):
     with (dirpath / '_started_321').open('wb') as f:
         f.write(b'abcd')
 
-    dataset = pq.ParquetDataset(dirpath)
-    assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_multiindex_duplicate_values(tempdir):
+@parametrize_legacy_dataset
+def test_multiindex_duplicate_values(tempdir, use_legacy_dataset):
     num_rows = 3
     numbers = list(range(num_rows))
     index = pd.MultiIndex.from_arrays(
@@ -2403,7 +2565,7 @@ def test_multiindex_duplicate_values(tempdir):
     filename = tempdir / 'dup_multi_index_levels.parquet'
 
     _write_table(table, filename)
-    result_table = _read_table(filename)
+    result_table = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
     assert table.equals(result_table)
 
     result_df = result_table.to_pandas()
@@ -2460,22 +2622,26 @@ def 
test_noncoerced_nanoseconds_written_without_exception(tempdir):
         pq.write_table(tb, filename, coerce_timestamps='ms', version='2.0')
 
 
-def test_read_non_existent_file(tempdir):
+@parametrize_legacy_dataset
+def test_read_non_existent_file(tempdir, use_legacy_dataset):
     path = 'non-existent-file.parquet'
     try:
-        pq.read_table(path)
+        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
     except Exception as e:
         assert path in e.args[0]
 
 
-def test_read_table_doesnt_warn(datadir):
+@parametrize_legacy_dataset
+def test_read_table_doesnt_warn(datadir, use_legacy_dataset):
     with pytest.warns(None) as record:
-        pq.read_table(datadir / 'v0.7.1.parquet')
+        pq.read_table(datadir / 'v0.7.1.parquet',
+                      use_legacy_dataset=use_legacy_dataset)
 
     assert len(record) == 0
 
 
 def _test_write_to_dataset_with_partitions(base_path,
+                                           use_legacy_dataset=True,
                                            filesystem=None,
                                            schema=None,
                                            index_name=None):
@@ -2505,12 +2671,19 @@ def _test_write_to_dataset_with_partitions(base_path,
     # partitioned dataset
     dataset = pq.ParquetDataset(base_path,
                                 filesystem=filesystem,
-                                validate_schema=True)
+                                validate_schema=True,
+                                use_legacy_dataset=use_legacy_dataset)
     # ARROW-2209: Ensure the dataset schema also includes the partition columns
-    dataset_cols = set(dataset.schema.to_arrow_schema().names)
+    if use_legacy_dataset:
+        dataset_cols = set(dataset.schema.to_arrow_schema().names)
+    else:
+        # NB schema property is an arrow and not parquet schema
+        dataset_cols = set(dataset.schema.names)
+
     assert dataset_cols == set(output_table.schema.names)
 
-    input_table = dataset.read()
+    use_threads = deterministic_row_order(use_legacy_dataset)
+    input_table = dataset.read(use_threads=use_threads)
     input_df = input_table.to_pandas()
 
     # Read data back in and compare with original DataFrame
@@ -2520,12 +2693,15 @@ def _test_write_to_dataset_with_partitions(base_path,
 
     # Partitioned columns become 'categorical' dtypes
     input_df = input_df[cols]
-    for col in partition_by:
-        output_df[col] = output_df[col].astype('category')
+    if use_legacy_dataset:
+        for col in partition_by:
+            output_df[col] = output_df[col].astype('category')
     assert output_df.equals(input_df)
 
 
-def _test_write_to_dataset_no_partitions(base_path, filesystem=None):
+def _test_write_to_dataset_no_partitions(base_path,
+                                         use_legacy_dataset=True,
+                                         filesystem=None):
     # ARROW-1400
     output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
                               'group2': list('eefeffgeee'),
@@ -2549,8 +2725,10 @@ def _test_write_to_dataset_no_partitions(base_path, 
filesystem=None):
 
     # Deduplicated incoming DataFrame should match
     # original outgoing Dataframe
-    input_table = pq.ParquetDataset(base_path,
-                                    filesystem=filesystem).read()
+    input_table = pq.ParquetDataset(
+        base_path, filesystem=filesystem,
+        use_legacy_dataset=use_legacy_dataset
+    ).read()
     input_df = input_table.to_pandas()
     input_df = input_df.drop_duplicates()
     input_df = input_df[cols]
@@ -2558,28 +2736,37 @@ def _test_write_to_dataset_no_partitions(base_path, 
filesystem=None):
 
 
 @pytest.mark.pandas
-def test_write_to_dataset_with_partitions(tempdir):
-    _test_write_to_dataset_with_partitions(str(tempdir))
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset):
+    _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_write_to_dataset_with_partitions_and_schema(tempdir):
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_schema(
+    tempdir, use_legacy_dataset
+):
     schema = pa.schema([pa.field('group1', type=pa.string()),
                         pa.field('group2', type=pa.string()),
                         pa.field('num', type=pa.int64()),
                         pa.field('nan', type=pa.int32()),
                         pa.field('date', type=pa.timestamp(unit='us'))])
-    _test_write_to_dataset_with_partitions(str(tempdir), schema=schema)
+    _test_write_to_dataset_with_partitions(
+        str(tempdir), use_legacy_dataset, schema=schema)
 
 
 @pytest.mark.pandas
-def test_write_to_dataset_with_partitions_and_index_name(tempdir):
-    _test_write_to_dataset_with_partitions(str(tempdir),
-                                           index_name='index_name')
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_index_name(
+    tempdir, use_legacy_dataset
+):
+    _test_write_to_dataset_with_partitions(
+        str(tempdir), use_legacy_dataset, index_name='index_name')
 
 
 @pytest.mark.pandas
-def test_write_to_dataset_no_partitions(tempdir):
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset):
     _test_write_to_dataset_no_partitions(str(tempdir))
 
 
@@ -2628,6 +2815,7 @@ def test_large_table_int32_overflow():
     _write_table(table, f)
 
 
+# TODO(ARROW-8074) buffer support
 def _simple_table_roundtrip(table, **write_kwargs):
     stream = pa.BufferOutputStream()
     _write_table(table, stream, **write_kwargs)
@@ -2696,7 +2884,8 @@ def test_list_of_binary_large_cell():
 
 
 @pytest.mark.pandas
-def test_index_column_name_duplicate(tempdir):
+@parametrize_legacy_dataset
+def test_index_column_name_duplicate(tempdir, use_legacy_dataset):
     data = {
         'close': {
             pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998,
@@ -2715,13 +2904,14 @@ def test_index_column_name_duplicate(tempdir):
     dfx = pd.DataFrame(data).set_index('time', drop=False)
     tdfx = pa.Table.from_pandas(dfx)
     _write_table(tdfx, path)
-    arrow_table = _read_table(path)
+    arrow_table = _read_table(path, use_legacy_dataset=use_legacy_dataset)
     result_df = arrow_table.to_pandas()
     tm.assert_frame_equal(result_df, dfx)
 
 
 @pytest.mark.pandas
-def test_parquet_nested_convenience(tempdir):
+@parametrize_legacy_dataset
+def test_parquet_nested_convenience(tempdir, use_legacy_dataset):
     # ARROW-1684
     df = pd.DataFrame({
         'a': [[1, 2, 3], None, [4, 5], []],
@@ -2733,15 +2923,18 @@ def test_parquet_nested_convenience(tempdir):
     table = pa.Table.from_pandas(df, preserve_index=False)
     _write_table(table, path)
 
-    read = pq.read_table(path, columns=['a'])
+    read = pq.read_table(
+        path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
     tm.assert_frame_equal(read.to_pandas(), df[['a']])
 
-    read = pq.read_table(path, columns=['a', 'b'])
+    read = pq.read_table(
+        path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset)
     tm.assert_frame_equal(read.to_pandas(), df)
 
 
 @pytest.mark.pandas
-def test_backwards_compatible_index_naming(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_naming(datadir, use_legacy_dataset):
     expected_string = b"""\
 carat        cut  color  clarity  depth  table  price     x     y     z
  0.23      Ideal      E      SI2   61.5   55.0    326  3.95  3.98  2.43
@@ -2756,13 +2949,17 @@ carat        cut  color  clarity  depth  table  price   
  x     y     z
  0.23  Very Good      H      VS1   59.4   61.0    338  4.00  4.05  2.39"""
     expected = pd.read_csv(io.BytesIO(expected_string), sep=r'\s{2,}',
                            index_col=None, header=0, engine='python')
-    table = _read_table(datadir / 'v0.7.1.parquet')
+    table = _read_table(
+        datadir / 'v0.7.1.parquet', use_legacy_dataset=use_legacy_dataset)
     result = table.to_pandas()
     tm.assert_frame_equal(result, expected)
 
 
 @pytest.mark.pandas
-def test_backwards_compatible_index_multi_level_named(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_multi_level_named(
+    datadir, use_legacy_dataset
+):
     expected_string = b"""\
 carat        cut  color  clarity  depth  table  price     x     y     z
  0.23      Ideal      E      SI2   61.5   55.0    326  3.95  3.98  2.43
@@ -2781,13 +2978,17 @@ carat        cut  color  clarity  depth  table  price   
  x     y     z
         header=0, engine='python'
     ).sort_index()
 
-    table = _read_table(datadir / 'v0.7.1.all-named-index.parquet')
+    table = _read_table(datadir / 'v0.7.1.all-named-index.parquet',
+                        use_legacy_dataset=use_legacy_dataset)
     result = table.to_pandas()
     tm.assert_frame_equal(result, expected)
 
 
 @pytest.mark.pandas
-def test_backwards_compatible_index_multi_level_some_named(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_multi_level_some_named(
+        datadir, use_legacy_dataset
+):
     expected_string = b"""\
 carat        cut  color  clarity  depth  table  price     x     y     z
  0.23      Ideal      E      SI2   61.5   55.0    326  3.95  3.98  2.43
@@ -2807,13 +3008,17 @@ carat        cut  color  clarity  depth  table  price   
  x     y     z
     ).sort_index()
     expected.index = expected.index.set_names(['cut', None, 'clarity'])
 
-    table = _read_table(datadir / 'v0.7.1.some-named-index.parquet')
+    table = _read_table(datadir / 'v0.7.1.some-named-index.parquet',
+                        use_legacy_dataset=use_legacy_dataset)
     result = table.to_pandas()
     tm.assert_frame_equal(result, expected)
 
 
 @pytest.mark.pandas
-def test_backwards_compatible_column_metadata_handling(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_column_metadata_handling(
+    datadir, use_legacy_dataset
+):
     expected = pd.DataFrame(
         {'a': [1, 2, 3], 'b': [.1, .2, .3],
          'c': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')})
@@ -2823,15 +3028,17 @@ def 
test_backwards_compatible_column_metadata_handling(datadir):
         names=['index', None])
 
     path = datadir / 'v0.7.1.column-metadata-handling.parquet'
-    table = _read_table(path)
+    table = _read_table(path, use_legacy_dataset=use_legacy_dataset)
     result = table.to_pandas()
     tm.assert_frame_equal(result, expected)
 
-    table = _read_table(path, columns=['a'])
+    table = _read_table(
+        path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
     result = table.to_pandas()
     tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True))
 
 
+# TODO(dataset) support pickling
 def _make_dataset_for_pickling(tempdir, N=100):
     path = tempdir / 'data.parquet'
     fs = LocalFileSystem.get_instance()
@@ -2894,7 +3101,8 @@ def test_cloudpickle_dataset(tempdir, datadir):
 
 
 @pytest.mark.pandas
-def test_decimal_roundtrip(tempdir):
+@parametrize_legacy_dataset
+def test_decimal_roundtrip(tempdir, use_legacy_dataset):
     num_values = 10
 
     columns = {}
@@ -2914,7 +3122,8 @@ def test_decimal_roundtrip(tempdir):
     string_filename = str(filename)
     table = pa.Table.from_pandas(expected)
     _write_table(table, string_filename)
-    result_table = _read_table(string_filename)
+    result_table = _read_table(
+        string_filename, use_legacy_dataset=use_legacy_dataset)
     result = result_table.to_pandas()
     tm.assert_frame_equal(result, expected)
 
@@ -2935,7 +3144,8 @@ def test_decimal_roundtrip_negative_scale(tempdir):
 
 
 @pytest.mark.pandas
-def test_parquet_writer_context_obj(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_writer_context_obj(tempdir, use_legacy_dataset):
     df = _test_dataframe(100)
     df['unique_id'] = 0
 
@@ -2953,14 +3163,18 @@ def test_parquet_writer_context_obj(tempdir):
             frames.append(df.copy())
 
     buf = out.getvalue()
-    result = _read_table(pa.BufferReader(buf))
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
 
     expected = pd.concat(frames, ignore_index=True)
     tm.assert_frame_equal(result.to_pandas(), expected)
 
 
 @pytest.mark.pandas
-def test_parquet_writer_context_obj_with_exception(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_writer_context_obj_with_exception(
+    tempdir, use_legacy_dataset
+):
     df = _test_dataframe(100)
     df['unique_id'] = 0
 
@@ -2985,21 +3199,23 @@ def 
test_parquet_writer_context_obj_with_exception(tempdir):
         assert str(e) == error_text
 
     buf = out.getvalue()
-    result = _read_table(pa.BufferReader(buf))
+    result = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
 
     expected = pd.concat(frames, ignore_index=True)
     tm.assert_frame_equal(result.to_pandas(), expected)
 
 
 @pytest.mark.pandas
-def test_zlib_compression_bug():
+@parametrize_legacy_dataset_skip_buffer
+def test_zlib_compression_bug(use_legacy_dataset):
     # ARROW-3514: "zlib deflate failed, output buffer too small"
     table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
     f = io.BytesIO()
     pq.write_table(table, f, compression='gzip')
 
     f.seek(0)
-    roundtrip = pq.read_table(f)
+    roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
     tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas())
 
 
@@ -3051,7 +3267,8 @@ def test_empty_row_groups(tempdir):
 
 
 @pytest.mark.pandas
-def test_parquet_writer_with_caller_provided_filesystem():
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset):
     out = pa.BufferOutputStream()
 
     class CustomFS(FileSystem):
@@ -3078,7 +3295,8 @@ def test_parquet_writer_with_caller_provided_filesystem():
     assert out.closed
 
     buf = out.getvalue()
-    table_read = _read_table(pa.BufferReader(buf))
+    table_read = _read_table(
+        pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df_read, df)
 
@@ -3097,7 +3315,8 @@ def test_writing_empty_lists():
     _check_roundtrip(table)
 
 
-def test_write_nested_zero_length_array_chunk_failure():
+@parametrize_legacy_dataset
+def test_write_nested_zero_length_array_chunk_failure(use_legacy_dataset):
     # Bug report in ARROW-3792
     cols = OrderedDict(
         int32=pa.int32(),
@@ -3122,11 +3341,12 @@ def test_write_nested_zero_length_array_chunk_failure():
     my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols))
                   for batch in my_arrays]
     tbl = pa.Table.from_batches(my_batches, pa.schema(cols))
-    _check_roundtrip(tbl)
+    _check_roundtrip(tbl, use_legacy_dataset=use_legacy_dataset)
 
 
 @pytest.mark.pandas
-def test_partitioned_dataset(tempdir):
+@parametrize_legacy_dataset
+def test_partitioned_dataset(tempdir, use_legacy_dataset):
     # ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset
     # to a Parquet file
     path = tempdir / "ARROW-3208"
@@ -3138,7 +3358,8 @@ def test_partitioned_dataset(tempdir):
     table = pa.Table.from_pandas(df)
     pq.write_to_dataset(table, root_path=str(path),
                         partition_cols=['one', 'two'])
-    table = pq.ParquetDataset(path).read()
+    table = pq.ParquetDataset(
+        path, use_legacy_dataset=use_legacy_dataset).read()
     pq.write_table(table, path / "output.parquet")
 
 
@@ -3156,7 +3377,8 @@ def test_read_column_invalid_index():
 
 
 @pytest.mark.pandas
-def test_direct_read_dictionary():
+@parametrize_legacy_dataset_skip_buffer
+def test_direct_read_dictionary(use_legacy_dataset):
     # ARROW-3325
     repeats = 10
     nunique = 5
@@ -3172,7 +3394,8 @@ def test_direct_read_dictionary():
     contents = bio.getvalue()
 
     result = pq.read_table(pa.BufferReader(contents),
-                           read_dictionary=['f0'])
+                           read_dictionary=['f0'],
+                           use_legacy_dataset=use_legacy_dataset)
 
     # Compute dictionary-encoded subfield
     expected = pa.table([table[0].dictionary_encode()], names=['f0'])
@@ -3180,14 +3403,17 @@ def test_direct_read_dictionary():
 
 
 @pytest.mark.pandas
-def test_dataset_read_dictionary(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_read_dictionary(tempdir, use_legacy_dataset):
     path = tempdir / "ARROW-3325-dataset"
     t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
     t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
     pq.write_to_dataset(t1, root_path=str(path))
     pq.write_to_dataset(t2, root_path=str(path))
 
-    result = pq.ParquetDataset(path, read_dictionary=['f0']).read()
+    result = pq.ParquetDataset(
+        path, read_dictionary=['f0'],
+        use_legacy_dataset=use_legacy_dataset).read()
 
     # The order of the chunks is non-deterministic
     ex_chunks = [t1[0].chunk(0).dictionary_encode(),
@@ -3203,7 +3429,8 @@ def test_dataset_read_dictionary(tempdir):
 
 
 @pytest.mark.pandas
-def test_direct_read_dictionary_subfield():
+@parametrize_legacy_dataset_skip_buffer
+def test_direct_read_dictionary_subfield(use_legacy_dataset):
     repeats = 10
     nunique = 5
 
@@ -3216,7 +3443,8 @@ def test_direct_read_dictionary_subfield():
     pq.write_table(table, bio)
     contents = bio.getvalue()
     result = pq.read_table(pa.BufferReader(contents),
-                           read_dictionary=['f0.list.item'])
+                           read_dictionary=['f0.list.item'],
+                           use_legacy_dataset=use_legacy_dataset)
 
     arr = pa.array(data[0])
     values_as_dict = arr.values.dictionary_encode()
@@ -3298,7 +3526,10 @@ def test_write_to_dataset_metadata(tempdir):
     assert d1 == d2
 
 
-def test_parquet_file_too_small(tempdir):
+# TODO(dataset) better error message for invalid files (certainly if it
+#  is the only one)
+@parametrize_legacy_dataset_not_supported
+def test_parquet_file_too_small(tempdir, use_legacy_dataset):
     path = str(tempdir / "test.parquet")
     with pytest.raises(pa.ArrowInvalid,
                        match='size is 0 bytes'):
@@ -3314,7 +3545,8 @@ def test_parquet_file_too_small(tempdir):
 
 
 @pytest.mark.pandas
-def test_categorical_index_survives_roundtrip():
+@parametrize_legacy_dataset_skip_buffer
+def test_categorical_index_survives_roundtrip(use_legacy_dataset):
     # ARROW-3652, addressed by ARROW-3246
     df = pd.DataFrame([['a', 'b'], ['c', 'd']], columns=['c1', 'c2'])
     df['c1'] = df['c1'].astype('category')
@@ -3323,13 +3555,15 @@ def test_categorical_index_survives_roundtrip():
     table = pa.Table.from_pandas(df)
     bos = pa.BufferOutputStream()
     pq.write_table(table, bos)
-    ref_df = pq.read_pandas(bos.getvalue()).to_pandas()
+    ref_df = pq.read_pandas(
+        bos.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas()
     assert isinstance(ref_df.index, pd.CategoricalIndex)
     assert ref_df.index.equals(df.index)
 
 
 @pytest.mark.pandas
-def test_categorical_order_survives_roundtrip():
+@parametrize_legacy_dataset_skip_buffer
+def test_categorical_order_survives_roundtrip(use_legacy_dataset):
     # ARROW-6302
     df = pd.DataFrame({"a": pd.Categorical(
         ["a", "b", "c", "a"], categories=["b", "c", "d"], ordered=True)})
@@ -3339,7 +3573,8 @@ def test_categorical_order_survives_roundtrip():
     pq.write_table(table, bos)
 
     contents = bos.getvalue()
-    result = pq.read_pandas(contents).to_pandas()
+    result = pq.read_pandas(
+        contents, use_legacy_dataset=use_legacy_dataset).to_pandas()
 
     tm.assert_frame_equal(result, df)
 
@@ -3351,7 +3586,8 @@ def _simple_table_write_read(table):
     return pq.read_table(pa.BufferReader(contents))
 
 
-def test_dictionary_array_automatically_read():
+@parametrize_legacy_dataset_skip_buffer
+def test_dictionary_array_automatically_read(use_legacy_dataset):
     # ARROW-3246
 
     # Make a large dictionary, a little over 4MB of data
@@ -3416,7 +3652,8 @@ def test_field_id_metadata():
 
 
 @pytest.mark.pandas
-def test_pandas_categorical_na_type_row_groups():
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_categorical_na_type_row_groups(use_legacy_dataset):
     # ARROW-5085
     df = pd.DataFrame({"col": [None] * 100, "int": [1.0] * 100})
     df_category = df.astype({"col": "category", "int": "category"})
@@ -3426,7 +3663,8 @@ def test_pandas_categorical_na_type_row_groups():
 
     # it works
     pq.write_table(table_cat, buf, version="2.0", chunk_size=10)
-    result = pq.read_table(buf.getvalue())
+    result = pq.read_table(
+        buf.getvalue(), use_legacy_dataset=use_legacy_dataset)
 
     # Result is non-categorical
     assert result[0].equals(table[0])
@@ -3434,7 +3672,8 @@ def test_pandas_categorical_na_type_row_groups():
 
 
 @pytest.mark.pandas
-def test_pandas_categorical_roundtrip():
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_categorical_roundtrip(use_legacy_dataset):
     # ARROW-5480, this was enabled by ARROW-3246
 
     # Have one of the categories unobserved and include a null (-1)
@@ -3446,7 +3685,8 @@ def test_pandas_categorical_roundtrip():
     buf = pa.BufferOutputStream()
     pq.write_table(pa.table(df), buf)
 
-    result = pq.read_table(buf.getvalue()).to_pandas()
+    result = pq.read_table(
+        buf.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas()
     assert result.x.dtype == 'category'
     assert (result.x.cat.categories == categories).all()
     tm.assert_frame_equal(result, df)
@@ -3495,8 +3735,9 @@ def test_multi_dataset_metadata(tempdir):
     assert md['serialized_size'] > 0
 
 
+@parametrize_legacy_dataset
 @pytest.mark.pandas
-def test_filter_before_validate_schema(tempdir):
+def test_filter_before_validate_schema(tempdir, use_legacy_dataset):
     # ARROW-4076 apply filter before schema validation
     # to avoid checking unneeded schemas
 
@@ -3513,7 +3754,8 @@ def test_filter_before_validate_schema(tempdir):
     pq.write_table(table2, dir2 / 'data.parquet')
 
     # read single file using filter
-    table = pq.read_table(tempdir, filters=[[('A', '==', 0)]])
+    table = pq.read_table(tempdir, filters=[[('A', '==', 0)]],
+                          use_legacy_dataset=use_legacy_dataset)
     assert table.column('B').equals(pa.chunked_array([[1, 2, 3]]))
 
 
@@ -3556,6 +3798,7 @@ def test_fastparquet_cross_compatibility(tempdir):
     tm.assert_frame_equal(table_fp.to_pandas(), df)
 
 
+@parametrize_legacy_dataset_skip_buffer
 @pytest.mark.parametrize('array_factory', [
     lambda: pa.array([0, None] * 10),
     lambda: pa.array([0, None] * 10).dictionary_encode(),
@@ -3564,7 +3807,9 @@ def test_fastparquet_cross_compatibility(tempdir):
 ])
 @pytest.mark.parametrize('use_dictionary', [False, True])
 @pytest.mark.parametrize('read_dictionary', [False, True])
-def test_buffer_contents(array_factory, use_dictionary, read_dictionary):
+def test_buffer_contents(
+        array_factory, use_dictionary, read_dictionary, use_legacy_dataset
+):
     # Test that null values are deterministically initialized to zero
     # after a roundtrip through Parquet.
     # See ARROW-8006 and ARROW-8011.
@@ -3574,9 +3819,66 @@ def test_buffer_contents(array_factory, use_dictionary, 
read_dictionary):
     bio.seek(0)
     read_dictionary = ['col'] if read_dictionary else None
     table = pq.read_table(bio, use_threads=False,
-                          read_dictionary=read_dictionary)
+                          read_dictionary=read_dictionary,
+                          use_legacy_dataset=use_legacy_dataset)
 
     for col in table.columns:
         [chunk] = col.chunks
         buf = chunk.buffers()[1]
         assert buf.to_pybytes() == buf.size * b"\0"
+
+
[email protected]
+def test_dataset_unsupported_keywords():
+
+    with pytest.raises(ValueError, match="not yet supported with the new"):
+        pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([]))
+
+    with pytest.raises(ValueError, match="not yet supported with the new"):
+        pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([]))
+
+    with pytest.raises(ValueError, match="not yet supported with the new"):
+        pq.ParquetDataset("", use_legacy_dataset=False, validate_schema=False)
+
+    with pytest.raises(ValueError, match="not yet supported with the new"):
+        pq.ParquetDataset("", use_legacy_dataset=False, split_row_groups=True)
+
+    with pytest.raises(ValueError, match="not yet supported with the new"):
+        pq.ParquetDataset("", use_legacy_dataset=False, metadata_nthreads=4)
+
+    with pytest.raises(ValueError, match="not yet supported with the new"):
+        pq.read_table("", use_legacy_dataset=False, metadata=pa.schema([]))
+
+
[email protected]
+def test_dataset_partitioning(tempdir):
+    import pyarrow.dataset as ds
+
+    # create small dataset with directory partitioning
+    root_path = tempdir / "test_partitioning"
+    (root_path / "2012" / "10" / "01").mkdir(parents=True)
+
+    table = pa.table({'a': [1, 2, 3]})
+    pq.write_table(
+        table, str(root_path / "2012" / "10" / "01" / "data.parquet"))
+
+    # This works with new dataset API
+
+    # read_table
+    part = ds.partitioning(field_names=["year", "month", "day"])
+    result = pq.read_table(
+        str(root_path), partitioning=part, use_legacy_dataset=False)
+    assert result.column_names == ["a", "year", "month", "day"]
+
+    result = pq.ParquetDataset(
+        str(root_path), partitioning=part, use_legacy_dataset=False).read()
+    assert result.column_names == ["a", "year", "month", "day"]
+
+    # This raises an error for legacy dataset
+    with pytest.raises(ValueError):
+        pq.read_table(
+            str(root_path), partitioning=part, use_legacy_dataset=True)
+
+    with pytest.raises(ValueError):
+        pq.ParquetDataset(
+            str(root_path), partitioning=part, use_legacy_dataset=True)

Reply via email to