This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a3f2678 ARROW-8039: [Python] Use dataset API in existing parquet
readers and tests
a3f2678 is described below
commit a3f267833c7b3eacc1a1c44ea9c465e05107b31e
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Thu Apr 9 09:57:54 2020 -0400
ARROW-8039: [Python] Use dataset API in existing parquet readers and tests
This is testing to optionally use the dataset API in the pyarrow parquet
reader implementation (`read_table` and `ParquetDataset().read()`).
Currently, it is enabled by passing `use_legacy_dataset=False` (mechanism
to opt in to be discussed), which allows to run our existing parquet tests with
this (the approach I now took is to parametrize the existing tests for
use_legacy_dataset True/False).
This allows users to do:
```
table = pq.read_table("my_parquet_file_or_dir", columns=.., filters=..,
use_legacy_dataset=False)
```
or
```
dataset = pq.ParquetDataset("my_parquet_dir/", use_legacy_dataset=False)
table = table.read(...)
```
and with the idea that at some point, the default for `use_legacy_dataset`
would switch from `True` to `False`.
Long term, I think we certainly want to keep `pq.read_table` (and I think
we will also be able to support most of its keywords).
The future for `pq.ParquetDataset` is less clear (it has a lot of API that
is tied to the python implementation, eg the ParquetDatasetPiece, PartitionSet,
ParquetPartitions, .. classes). We probably want to move people towards a "new"
ParquetDataset that is more consistent with the new general datasets API.
Therefore, right now the `ParquetDataset(use_legacy_dataset=False)` does not
yet try to provide all those features, but just the `read()` method. We can
later see which extra featu [...]
Closes #6303 from jorisvandenbossche/parquet-dataset-experiment
Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
python/pyarrow/_dataset.pyx | 10 +-
python/pyarrow/parquet.py | 319 +++++++++++++--
python/pyarrow/tests/test_parquet.py | 744 ++++++++++++++++++++++++-----------
3 files changed, 810 insertions(+), 263 deletions(-)
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 556faa7..231588a 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -623,7 +623,8 @@ cdef class ParquetReadOptions:
buffer_size : int, default 8192
Size of buffered stream, if enabled. Default is 8KB.
dictionary_columns : list of string, default None
- Names of columns which should be read as dictionaries.
+ Names of columns which should be dictionary encoded as
+ they are read.
"""
cdef public:
@@ -632,9 +633,11 @@ cdef class ParquetReadOptions:
set dictionary_columns
def __init__(self, bint use_buffered_stream=False,
- uint32_t buffer_size=8192,
+ buffer_size=8192,
dictionary_columns=None):
self.use_buffered_stream = use_buffered_stream
+ if buffer_size <= 0:
+ raise ValueError("Buffer size must be larger than zero")
self.buffer_size = buffer_size
self.dictionary_columns = set(dictionary_columns or set())
@@ -1191,7 +1194,8 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
c_options
)
else:
- raise TypeError('Must pass either paths or a FileSelector')
+ raise TypeError('Must pass either paths or a FileSelector, but '
+ 'passed {}'.format(type(paths_or_selector)))
self.init(GetResultValue(result))
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 6563216..8586366 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -77,7 +77,7 @@ def _check_contains_null(val):
return False
-def _check_filters(filters):
+def _check_filters(filters, check_null_strings=True):
"""
Check if filters are well-formed.
"""
@@ -89,19 +89,94 @@ def _check_filters(filters):
# too few:
# We have [(,,), ..] instead of [[(,,), ..]]
filters = [filters]
- for conjunction in filters:
- for col, op, val in conjunction:
- if (
- isinstance(val, list)
- and all(_check_contains_null(v) for v in val)
- or _check_contains_null(val)
- ):
- raise NotImplementedError(
- "Null-terminated binary strings are not supported as"
- " filter values."
- )
+ if check_null_strings:
+ for conjunction in filters:
+ for col, op, val in conjunction:
+ if (
+ isinstance(val, list)
+ and all(_check_contains_null(v) for v in val)
+ or _check_contains_null(val)
+ ):
+ raise NotImplementedError(
+ "Null-terminated binary strings are not supported "
+ "as filter values."
+ )
return filters
+
+_DNF_filter_doc = """Predicates are expressed in disjunctive normal form
(DNF), like
+ ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary boolean logical
+ combinations of single column predicates. The innermost tuples each
+ describe a single column predicate. The list of inner predicates is
+ interpreted as a conjunction (AND), forming a more selective and
+ multiple column predicate. Finally, the most outer list combines these
+ filters as a disjunction (OR).
+
+ Predicates may also be passed as List[Tuple]. This form is interpreted
+ as a single conjunction. To express OR in predicates, one must
+ use the (preferred) List[List[Tuple]] notation."""
+
+
+def _filters_to_expression(filters):
+ """
+ Check if filters are well-formed.
+
+ See _DNF_filter_doc above for more details.
+ """
+ import pyarrow.dataset as ds
+
+ if isinstance(filters, ds.Expression):
+ return filters
+
+ filters = _check_filters(filters, check_null_strings=False)
+
+ def convert_single_predicate(col, op, val):
+ field = ds.field(col)
+
+ if op == "=" or op == "==":
+ return field == val
+ elif op == "!=":
+ return field != val
+ elif op == '<':
+ return field < val
+ elif op == '>':
+ return field > val
+ elif op == '<=':
+ return field <= val
+ elif op == '>=':
+ return field >= val
+ elif op == 'in':
+ return field.isin(val)
+ elif op == 'not in':
+ return ~field.isin(val)
+ else:
+ raise ValueError(
+ '"{0}" is not a valid operator in predicates.'.format(
+ (col, op, val)))
+
+ or_exprs = []
+
+ for conjunction in filters:
+ and_exprs = []
+ for col, op, val in conjunction:
+ and_exprs.append(convert_single_predicate(col, op, val))
+
+ expr = and_exprs[0]
+ if len(and_exprs) > 1:
+ for and_expr in and_exprs[1:]:
+ expr = ds.AndExpression(expr, and_expr)
+
+ or_exprs.append(expr)
+
+ expr = or_exprs[0]
+ if len(or_exprs) > 1:
+ expr = ds.OrExpression(*or_exprs)
+ for or_expr in or_exprs[1:]:
+ expr = ds.OrExpression(expr, or_expr)
+
+ return expr
+
+
# ----------------------------------------------------------------------
# Reading a single Parquet file
@@ -979,7 +1054,18 @@ memory_map : bool, default False
improve performance in some environments.
buffer_size : int, default 0
If positive, perform read buffering when deserializing individual
- column chunks. Otherwise IO calls are unbuffered."""
+ column chunks. Otherwise IO calls are unbuffered.
+partitioning : Partitioning or str or list of str, default "hive"
+ The partitioning scheme for a partitioned dataset. The default of "hive"
+ assumes directory names with key=value pairs like "/year=2009/month=11".
+ In addition, a scheme like "/2009/11" is also supported, in which case
+ you need to specify the field names or a full schema. See the
+ ``pyarrow.dataset.partitioning()`` function for more details.
+use_legacy_dataset : bool, default True
+ Set to False to enable the new code path (experimental, using the
+ new Arrow Dataset API). Among other things, this allows to pass
+ `filters` for all columns and not only the partition keys, enables
+ different partitioning schemes, etc."""
class ParquetDataset:
@@ -1005,31 +1091,51 @@ split_row_groups : bool, default False
validate_schema : bool, default True
Check that individual file schemas are all the same / compatible.
filters : List[Tuple] or List[List[Tuple]] or None (default)
- List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This
- implements partition-level (hive) filtering only, i.e., to prevent the
- loading of some files of the dataset.
-
- Predicates are expressed in disjunctive normal form (DNF). This means
- that the innermost tuple describe a single column predicate. These
- inner predicate make are all combined with a conjunction (AND) into a
- larger predicate. The most outer list then combines all filters
- with a disjunction (OR). By this, we should be able to express all
- kinds of filters that are possible using boolean logic.
-
- This function also supports passing in as List[Tuple]. These predicates
- are evaluated as a conjunction. To express OR in predicates, one must
- use the (preferred) List[List[Tuple]] notation.
+ Rows which do not match the filter predicate will be removed from scanned
+ data. Partition keys embedded in a nested directory structure will be
+ exploited to avoid loading files at all if they contain no matching rows.
+ If `use_legacy_dataset` is True, filters can only reference partition
+ keys and only a hive-style directory structure is supported. When
+ setting `use_legacy_dataset` to False, also within-file level filtering
+ and different partitioning schemes are supported.
+
+ {1}
metadata_nthreads: int, default 1
How many threads to allow the thread pool which is used to read the
dataset metadata. Increasing this is helpful to read partitioned
datasets.
-{}
-""".format(_read_docstring_common)
+{0}
+""".format(_read_docstring_common, _DNF_filter_doc)
+
+ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
+ metadata=None, split_row_groups=False, validate_schema=True,
+ filters=None, metadata_nthreads=1, read_dictionary=None,
+ memory_map=False, buffer_size=0, partitioning="hive",
+ use_legacy_dataset=True):
+ if not use_legacy_dataset:
+ return _ParquetDatasetV2(path_or_paths, filesystem=filesystem,
+ filters=filters,
+ partitioning=partitioning,
+ read_dictionary=read_dictionary,
+ memory_map=memory_map,
+ buffer_size=buffer_size,
+ # unsupported keywords
+ schema=schema, metadata=metadata,
+ split_row_groups=split_row_groups,
+ validate_schema=validate_schema,
+ metadata_nthreads=metadata_nthreads)
+ self = object.__new__(cls)
+ return self
def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
- memory_map=False, buffer_size=0):
+ memory_map=False, buffer_size=0, partitioning="hive",
+ use_legacy_dataset=True):
+ if partitioning != "hive":
+ raise ValueError(
+ 'Only "hive" for hive-like partitioning is supported when '
+ 'using use_legacy_dataset=True')
self._metadata = _ParquetDatasetMetadata()
a_path = path_or_paths
if isinstance(a_path, list):
@@ -1252,6 +1358,112 @@ def _make_manifest(path_or_paths, fs, pathsep='/',
metadata_nthreads=1,
return pieces, partitions, common_metadata_path, metadata_path
+class _ParquetDatasetV2:
+ """
+ ParquetDataset shim using the Dataset API under the hood.
+ """
+ def __init__(self, path_or_paths, filesystem=None, filters=None,
+ partitioning="hive", read_dictionary=None, buffer_size=None,
+ memory_map=False, **kwargs):
+ import pyarrow.dataset as ds
+ import pyarrow.fs
+
+ # Raise error for not supported keywords
+ for keyword, default in [
+ ("schema", None), ("metadata", None),
+ ("split_row_groups", False), ("validate_schema", True),
+ ("metadata_nthreads", 1)]:
+ if keyword in kwargs and kwargs[keyword] is not default:
+ raise ValueError(
+ "Keyword '{0}' is not yet supported with the new "
+ "Dataset API".format(keyword))
+
+ # map old filesystems to new one
+ # TODO(dataset) deal with other file systems
+ if isinstance(filesystem, LocalFileSystem):
+ filesystem = pyarrow.fs.LocalFileSystem(use_mmap=memory_map)
+ elif filesystem is None and memory_map:
+ # if memory_map is specified, assume local file system (string
+ # path can in principle be URI for any filesystem)
+ filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True)
+
+ # map additional arguments
+ read_options = {}
+ if buffer_size:
+ read_options.update(use_buffered_stream=True,
+ buffer_size=buffer_size)
+ if read_dictionary is not None:
+ read_options.update(dictionary_columns=read_dictionary)
+ parquet_format = ds.ParquetFileFormat(read_options=read_options)
+
+ self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
+ format=parquet_format,
+ partitioning=partitioning)
+ self._filters = filters
+ if filters is not None:
+ self._filter_expression = _filters_to_expression(filters)
+ else:
+ self._filter_expression = None
+
+ @property
+ def schema(self):
+ return self._dataset.schema
+
+ def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
+ """
+ Read (multiple) Parquet files as a single pyarrow.Table.
+
+ Parameters
+ ----------
+ columns : List[str]
+ Names of columns to read from the dataset.
+ use_threads : bool, default True
+ Perform multi-threaded column reads.
+ use_pandas_metadata : bool, default False
+ If True and file has custom pandas schema metadata, ensure that
+ index columns are also loaded.
+
+ Returns
+ -------
+ pyarrow.Table
+ Content of the file as a table (of columns).
+ """
+ # if use_pandas_metadata, we need to include index columns in the
+ # column selection, to be able to restore those in the pandas DataFrame
+ metadata = self._dataset.schema.metadata
+ if columns is not None and use_pandas_metadata:
+ if metadata and b'pandas' in metadata:
+ index_columns = set(_get_pandas_index_columns(metadata))
+ columns = columns + list(index_columns - set(columns))
+
+ table = self._dataset.to_table(
+ columns=columns, filter=self._filter_expression,
+ use_threads=use_threads
+ )
+
+ # if use_pandas_metadata, restore the pandas metadata (which gets
+ # lost if doing a specific `columns` selection in to_table)
+ if use_pandas_metadata:
+ if metadata and b"pandas" in metadata:
+ new_metadata = table.schema.metadata or {}
+ new_metadata.update({b"pandas": metadata[b"pandas"]})
+ table = table.replace_schema_metadata(new_metadata)
+
+ return table
+
+ def read_pandas(self, **kwargs):
+ """
+ Read dataset including pandas metadata, if any. Other arguments passed
+ through to ParquetDataset.read, see docstring for further details.
+ """
+ return self.read(use_pandas_metadata=True, **kwargs)
+
+ @property
+ def pieces(self):
+ # TODO raise deprecation warning
+ return list(self._dataset.get_fragments())
+
+
_read_table_docstring = """
{0}
@@ -1271,10 +1483,15 @@ metadata : FileMetaData
If separately computed
{1}
filters : List[Tuple] or List[List[Tuple]] or None (default)
- List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This
- implements partition-level (hive) filtering only, i.e., to prevent the
- loading of some files of the dataset if `source` is a directory.
- See the docstring of ParquetDataset for more details.
+ Rows which do not match the filter predicate will be removed from scanned
+ data. Partition keys embedded in a nested directory structure will be
+ exploited to avoid loading files at all if they contain no matching rows.
+ If `use_legacy_dataset` is True, filters can only reference partition
+ keys and only a hive-style directory structure is supported. When
+ setting `use_legacy_dataset` to False, also within-file level filtering
+ and different partitioning schemes are supported.
+
+ {3}
Returns
-------
@@ -1285,12 +1502,32 @@ Returns
def read_table(source, columns=None, use_threads=True, metadata=None,
use_pandas_metadata=False, memory_map=False,
read_dictionary=None, filesystem=None, filters=None,
- buffer_size=0):
+ buffer_size=0, partitioning="hive", use_legacy_dataset=True):
+ if not use_legacy_dataset:
+ if not _is_path_like(source):
+ raise ValueError("File-like objects are not yet supported with "
+ "the new Dataset API")
+
+ dataset = _ParquetDatasetV2(
+ source,
+ filesystem=filesystem,
+ partitioning=partitioning,
+ memory_map=memory_map,
+ read_dictionary=read_dictionary,
+ buffer_size=buffer_size,
+ filters=filters,
+ # unsupported keywords
+ metadata=metadata
+ )
+ return dataset.read(columns=columns, use_threads=use_threads,
+ use_pandas_metadata=use_pandas_metadata)
+
if _is_path_like(source):
pf = ParquetDataset(source, metadata=metadata, memory_map=memory_map,
read_dictionary=read_dictionary,
buffer_size=buffer_size,
- filesystem=filesystem, filters=filters)
+ filesystem=filesystem, filters=filters,
+ partitioning=partitioning)
else:
pf = ParquetFile(source, metadata=metadata,
read_dictionary=read_dictionary,
@@ -1307,11 +1544,13 @@ read_table.__doc__ = _read_table_docstring.format(
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded""")),
"""pyarrow.Table
- Content of the file as a table (of columns)""")
+ Content of the file as a table (of columns)""",
+ _DNF_filter_doc)
def read_pandas(source, columns=None, use_threads=True, memory_map=False,
- metadata=None, filters=None, buffer_size=0):
+ metadata=None, filters=None, buffer_size=0,
+ use_legacy_dataset=True):
return read_table(
source,
columns=columns,
@@ -1321,6 +1560,7 @@ def read_pandas(source, columns=None, use_threads=True,
memory_map=False,
memory_map=memory_map,
buffer_size=buffer_size,
use_pandas_metadata=True,
+ use_legacy_dataset=use_legacy_dataset,
)
@@ -1330,7 +1570,8 @@ read_pandas.__doc__ = _read_table_docstring.format(
_read_docstring_common,
"""pyarrow.Table
Content of the file as a Table of Columns, including DataFrame
- indexes as columns""")
+ indexes as columns""",
+ _DNF_filter_doc)
def write_table(table, where, row_group_size=None, version='1.0',
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index 7e900f0..89131b4 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -56,6 +56,21 @@ def datadir(datadir):
return datadir / 'parquet'
+parametrize_legacy_dataset = pytest.mark.parametrize(
+ "use_legacy_dataset",
+ [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+ "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_skip_buffer = pytest.mark.parametrize(
+ "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+
+
+def deterministic_row_order(use_legacy_dataset, chunk_size=None):
+ # TODO(datasets) ensure to use use_threads=False with the new dataset API
+ # in the tests because otherwise the row order is not deterministic
+ return False if not use_legacy_dataset and chunk_size is not None else True
+
+
def _write_table(table, path, **kwargs):
# So we see the ImportError somewhere
import pyarrow.parquet as pq
@@ -72,18 +87,26 @@ def _read_table(*args, **kwargs):
def _roundtrip_table(table, read_table_kwargs=None,
- write_table_kwargs=None):
+ write_table_kwargs=None, use_legacy_dataset=True):
read_table_kwargs = read_table_kwargs or {}
write_table_kwargs = write_table_kwargs or {}
- buf = io.BytesIO()
- _write_table(table, buf, **write_table_kwargs)
- buf.seek(0)
- return _read_table(buf, **read_table_kwargs)
+ if use_legacy_dataset:
+ buf = io.BytesIO()
+ _write_table(table, buf, **write_table_kwargs)
+ buf.seek(0)
+ return _read_table(buf, **read_table_kwargs)
+ else:
+ from pyarrow.fs import _MockFileSystem
+ mockfs = _MockFileSystem()
+ with mockfs.open_output_stream("test") as out:
+ _write_table(table, out, **write_table_kwargs)
+ return _read_table("test", filesystem=mockfs, use_legacy_dataset=False,
+ **read_table_kwargs)
def _check_roundtrip(table, expected=None, read_table_kwargs=None,
- **write_table_kwargs):
+ use_legacy_dataset=True, **write_table_kwargs):
if expected is None:
expected = table
@@ -91,31 +114,31 @@ def _check_roundtrip(table, expected=None,
read_table_kwargs=None,
# intentionally check twice
result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
- write_table_kwargs=write_table_kwargs)
+ write_table_kwargs=write_table_kwargs,
+ use_legacy_dataset=use_legacy_dataset)
assert result.equals(expected)
result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
- write_table_kwargs=write_table_kwargs)
+ write_table_kwargs=write_table_kwargs,
+ use_legacy_dataset=use_legacy_dataset)
assert result.equals(expected)
-def _roundtrip_pandas_dataframe(df, write_kwargs):
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
table = pa.Table.from_pandas(df)
-
- buf = io.BytesIO()
- _write_table(table, buf, **write_kwargs)
-
- buf.seek(0)
- table1 = _read_table(buf)
- return table1.to_pandas()
+ result = _roundtrip_table(
+ table, write_table_kwargs=write_kwargs,
+ use_legacy_dataset=use_legacy_dataset)
+ return result.to_pandas()
+@parametrize_legacy_dataset
@pytest.mark.parametrize('dtype', [int, float])
-def test_single_pylist_column_roundtrip(tempdir, dtype):
+def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
data = [pa.array(list(map(dtype, range(5))))]
table = pa.Table.from_arrays(data, names=['a'])
_write_table(table, filename)
- table_read = _read_table(filename)
+ table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
for i in range(table.num_columns):
col_written = table[i]
col_read = table_read[i]
@@ -156,8 +179,9 @@ def alltypes_sample(size=10000, seed=0, categorical=False):
@pytest.mark.pandas
+@parametrize_legacy_dataset
@pytest.mark.parametrize('chunk_size', [None, 1000])
-def test_pandas_parquet_2_0_roundtrip(tempdir, chunk_size):
+def test_pandas_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset):
df = alltypes_sample(size=10000, categorical=True)
filename = tempdir / 'pandas_roundtrip.parquet'
@@ -166,10 +190,17 @@ def test_pandas_parquet_2_0_roundtrip(tempdir,
chunk_size):
_write_table(arrow_table, filename, version="2.0",
coerce_timestamps='ms', chunk_size=chunk_size)
- table_read = pq.read_pandas(filename)
+ use_threads = deterministic_row_order(use_legacy_dataset, chunk_size)
+ table_read = pq.read_pandas(
+ filename, use_legacy_dataset=use_legacy_dataset,
+ use_threads=use_threads)
assert table_read.schema.pandas_metadata is not None
- assert arrow_table.schema.metadata == table_read.schema.metadata
+ read_metadata = table_read.schema.metadata
+ if not use_legacy_dataset:
+ read_metadata.pop(b"ARROW:schema")
+
+ assert arrow_table.schema.metadata == read_metadata
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@@ -181,74 +212,85 @@ def test_parquet_invalid_version(tempdir):
_write_table(table, tempdir / 'test_version.parquet', version="2.2")
-def test_set_data_page_size():
+@parametrize_legacy_dataset
+def test_set_data_page_size(use_legacy_dataset):
arr = pa.array([1, 2, 3] * 100000)
t = pa.Table.from_arrays([arr], names=['f0'])
# 128K, 512K
page_sizes = [2 << 16, 2 << 18]
for target_page_size in page_sizes:
- _check_roundtrip(t, data_page_size=target_page_size)
+ _check_roundtrip(t, data_page_size=target_page_size,
+ use_legacy_dataset=use_legacy_dataset)
@pytest.mark.pandas
-def test_chunked_table_write():
+@parametrize_legacy_dataset
+def test_chunked_table_write(use_legacy_dataset):
# ARROW-232
df = alltypes_sample(size=10)
batch = pa.RecordBatch.from_pandas(df)
table = pa.Table.from_batches([batch] * 3)
- _check_roundtrip(table, version='2.0')
+ _check_roundtrip(
+ table, version='2.0', use_legacy_dataset=use_legacy_dataset)
df, _ = dataframe_with_lists()
batch = pa.RecordBatch.from_pandas(df)
table = pa.Table.from_batches([batch] * 3)
- _check_roundtrip(table, version='2.0')
+ _check_roundtrip(
+ table, version='2.0', use_legacy_dataset=use_legacy_dataset)
@pytest.mark.pandas
-def test_memory_map(tempdir):
+@parametrize_legacy_dataset
+def test_memory_map(tempdir, use_legacy_dataset):
df = alltypes_sample(size=10)
table = pa.Table.from_pandas(df)
_check_roundtrip(table, read_table_kwargs={'memory_map': True},
- version='2.0')
+ version='2.0', use_legacy_dataset=use_legacy_dataset)
filename = str(tempdir / 'tmp_file')
with open(filename, 'wb') as f:
_write_table(table, f, version='2.0')
- table_read = pq.read_pandas(filename, memory_map=True)
+ table_read = pq.read_pandas(filename, memory_map=True,
+ use_legacy_dataset=use_legacy_dataset)
assert table_read.equals(table)
@pytest.mark.pandas
-def test_enable_buffered_stream(tempdir):
+@parametrize_legacy_dataset
+def test_enable_buffered_stream(tempdir, use_legacy_dataset):
df = alltypes_sample(size=10)
table = pa.Table.from_pandas(df)
_check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
- version='2.0')
+ version='2.0', use_legacy_dataset=use_legacy_dataset)
filename = str(tempdir / 'tmp_file')
with open(filename, 'wb') as f:
_write_table(table, f, version='2.0')
- table_read = pq.read_pandas(filename, buffer_size=4096)
+ table_read = pq.read_pandas(filename, buffer_size=4096,
+ use_legacy_dataset=use_legacy_dataset)
assert table_read.equals(table)
-def test_special_chars_filename(tempdir):
+@parametrize_legacy_dataset
+def test_special_chars_filename(tempdir, use_legacy_dataset):
table = pa.Table.from_arrays([pa.array([42])], ["ints"])
filename = "foo # bar"
path = tempdir / filename
assert not path.exists()
_write_table(table, str(path))
assert path.exists()
- table_read = _read_table(str(path))
+ table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
assert table_read.equals(table)
@pytest.mark.pandas
-def test_empty_table_roundtrip():
+@parametrize_legacy_dataset
+def test_empty_table_roundtrip(use_legacy_dataset):
df = alltypes_sample(size=10)
# Create a non-empty table to infer the types correctly, then slice to 0
@@ -259,24 +301,28 @@ def test_empty_table_roundtrip():
assert table.schema.field('null').type == pa.null()
assert table.schema.field('null_list').type == pa.list_(pa.null())
- _check_roundtrip(table, version='2.0')
+ _check_roundtrip(
+ table, version='2.0', use_legacy_dataset=use_legacy_dataset)
@pytest.mark.pandas
-def test_empty_table_no_columns():
+@parametrize_legacy_dataset
+def test_empty_table_no_columns(use_legacy_dataset):
df = pd.DataFrame()
empty = pa.Table.from_pandas(df, preserve_index=False)
- _check_roundtrip(empty)
+ _check_roundtrip(empty, use_legacy_dataset=use_legacy_dataset)
-def test_empty_lists_table_roundtrip():
+@parametrize_legacy_dataset
+def test_empty_lists_table_roundtrip(use_legacy_dataset):
# ARROW-2744: Shouldn't crash when writing an array of empty lists
arr = pa.array([[], []], type=pa.list_(pa.int32()))
table = pa.Table.from_arrays([arr], ["A"])
- _check_roundtrip(table)
+ _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset)
-def test_nested_list_nonnullable_roundtrip_bug():
+@parametrize_legacy_dataset
+def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
# Reproduce failure in ARROW-5630
typ = pa.list_(pa.field("item", pa.float32(), False))
num_rows = 10000
@@ -284,11 +330,13 @@ def test_nested_list_nonnullable_roundtrip_bug():
pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)]
* (num_rows // 10)), type=typ)
], ['a'])
- _check_roundtrip(t, data_page_size=4096)
+ _check_roundtrip(
+ t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
@pytest.mark.pandas
-def test_pandas_parquet_datetime_tz():
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_datetime_tz(use_legacy_dataset):
s = pd.Series([datetime.datetime(2017, 9, 6)])
s = s.dt.tz_localize('utc')
@@ -313,12 +361,14 @@ def test_pandas_parquet_datetime_tz():
@pytest.mark.pandas
-def test_datetime_timezone_tzinfo():
+@parametrize_legacy_dataset
+def test_datetime_timezone_tzinfo(use_legacy_dataset):
value = datetime.datetime(2018, 1, 1, 1, 23, 45,
tzinfo=datetime.timezone.utc)
df = pd.DataFrame({'foo': [value]})
- _roundtrip_pandas_dataframe(df, write_kwargs={})
+ _roundtrip_pandas_dataframe(
+ df, write_kwargs={}, use_legacy_dataset=use_legacy_dataset)
@pytest.mark.pandas
@@ -342,7 +392,8 @@ def test_pandas_parquet_custom_metadata(tempdir):
@pytest.mark.pandas
-def test_pandas_parquet_column_multiindex(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_column_multiindex(tempdir, use_legacy_dataset):
df = alltypes_sample(size=10)
df.columns = pd.MultiIndex.from_tuples(
list(zip(df.columns, df.columns[::-1])),
@@ -355,13 +406,17 @@ def test_pandas_parquet_column_multiindex(tempdir):
_write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
- table_read = pq.read_pandas(filename)
+ table_read = pq.read_pandas(
+ filename, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
-def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(
+ tempdir, use_legacy_dataset
+):
df = alltypes_sample(size=10000)
filename = tempdir / 'pandas_roundtrip.parquet'
@@ -373,19 +428,25 @@ def
test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir):
assert js['columns']
_write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
- table_read = pq.read_pandas(filename)
+ table_read = pq.read_pandas(
+ filename, use_legacy_dataset=use_legacy_dataset)
js = table_read.schema.pandas_metadata
assert not js['index_columns']
- assert arrow_table.schema.metadata == table_read.schema.metadata
+ read_metadata = table_read.schema.metadata
+ if not use_legacy_dataset:
+ read_metadata.pop(b"ARROW:schema")
+
+ assert arrow_table.schema.metadata == read_metadata
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
-def test_pandas_parquet_1_0_roundtrip(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_1_0_roundtrip(tempdir, use_legacy_dataset):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
@@ -407,7 +468,7 @@ def test_pandas_parquet_1_0_roundtrip(tempdir):
filename = tempdir / 'pandas_roundtrip.parquet'
arrow_table = pa.Table.from_pandas(df)
_write_table(arrow_table, filename, version='1.0')
- table_read = _read_table(filename)
+ table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
# We pass uint32_t as int64_t if we write Parquet version 1.0
@@ -417,12 +478,13 @@ def test_pandas_parquet_1_0_roundtrip(tempdir):
@pytest.mark.pandas
-def test_multiple_path_types(tempdir):
+@parametrize_legacy_dataset
+def test_multiple_path_types(tempdir, use_legacy_dataset):
# Test compatibility with PEP 519 path-like objects
path = tempdir / 'zzz.parquet'
df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
_write_table(df, path)
- table_read = _read_table(path)
+ table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@@ -430,13 +492,15 @@ def test_multiple_path_types(tempdir):
path = str(tempdir) + 'zzz.parquet'
df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
_write_table(df, path)
- table_read = _read_table(path)
+ table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
+# TODO(dataset) duplicate column selection actually gives duplicate columns now
@pytest.mark.pandas
-def test_pandas_column_selection(tempdir):
+@parametrize_legacy_dataset_not_supported
+def test_pandas_column_selection(tempdir, use_legacy_dataset):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
@@ -446,14 +510,17 @@ def test_pandas_column_selection(tempdir):
filename = tempdir / 'pandas_roundtrip.parquet'
arrow_table = pa.Table.from_pandas(df)
_write_table(arrow_table, filename)
- table_read = _read_table(filename, columns=['uint8'])
+ table_read = _read_table(
+ filename, columns=['uint8'], use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df[['uint8']], df_read)
# ARROW-4267: Selection of duplicate columns still leads to these columns
# being read uniquely.
- table_read = _read_table(filename, columns=['uint8', 'uint8'])
+ table_read = _read_table(
+ filename, columns=['uint8', 'uint8'],
+ use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df[['uint8']], df_read)
@@ -491,20 +558,24 @@ def _test_dataframe(size=10000, seed=0):
return df
+# TODO(ARROW-8074) NativeFile support
@pytest.mark.pandas
-def test_pandas_parquet_native_file_roundtrip(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_native_file_roundtrip(tempdir, use_legacy_dataset):
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version="2.0")
buf = imos.getvalue()
reader = pa.BufferReader(buf)
- df_read = _read_table(reader).to_pandas()
+ df_read = _read_table(
+ reader, use_legacy_dataset=use_legacy_dataset).to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
-def test_parquet_incremental_file_build(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
df = _test_dataframe(100)
df['unique_id'] = 0
@@ -524,33 +595,40 @@ def test_parquet_incremental_file_build(tempdir):
writer.close()
buf = out.getvalue()
- result = _read_table(pa.BufferReader(buf))
+ result = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
expected = pd.concat(frames, ignore_index=True)
tm.assert_frame_equal(result.to_pandas(), expected)
@pytest.mark.pandas
-def test_read_pandas_column_subset(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_read_pandas_column_subset(tempdir, use_legacy_dataset):
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version="2.0")
buf = imos.getvalue()
reader = pa.BufferReader(buf)
- df_read = pq.read_pandas(reader, columns=['strings', 'uint8']).to_pandas()
+ df_read = pq.read_pandas(
+ reader, columns=['strings', 'uint8'],
+ use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
tm.assert_frame_equal(df[['strings', 'uint8']], df_read)
@pytest.mark.pandas
-def test_pandas_parquet_empty_roundtrip(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_empty_roundtrip(tempdir, use_legacy_dataset):
df = _test_dataframe(0)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version="2.0")
buf = imos.getvalue()
reader = pa.BufferReader(buf)
- df_read = _read_table(reader).to_pandas()
+ df_read = _read_table(
+ reader, use_legacy_dataset=use_legacy_dataset).to_pandas()
tm.assert_frame_equal(df, df_read)
@@ -580,7 +658,8 @@ def test_pandas_can_write_nested_data(tempdir):
@pytest.mark.pandas
-def test_pandas_parquet_pyfile_roundtrip(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset):
filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
size = 5
df = pd.DataFrame({
@@ -598,13 +677,14 @@ def test_pandas_parquet_pyfile_roundtrip(tempdir):
data = io.BytesIO(filename.read_bytes())
- table_read = _read_table(data)
+ table_read = _read_table(data, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
-def test_pandas_parquet_configuration_options(tempdir):
+@parametrize_legacy_dataset
+def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
@@ -626,14 +706,16 @@ def test_pandas_parquet_configuration_options(tempdir):
for use_dictionary in [True, False]:
_write_table(arrow_table, filename, version='2.0',
use_dictionary=use_dictionary)
- table_read = _read_table(filename)
+ table_read = _read_table(
+ filename, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
for write_statistics in [True, False]:
_write_table(arrow_table, filename, version='2.0',
write_statistics=write_statistics)
- table_read = _read_table(filename)
+ table_read = _read_table(filename,
+ use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@@ -643,7 +725,8 @@ def test_pandas_parquet_configuration_options(tempdir):
continue
_write_table(arrow_table, filename, version='2.0',
compression=compression)
- table_read = _read_table(filename)
+ table_read = _read_table(
+ filename, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@@ -662,7 +745,8 @@ def make_sample_file(table_or_df):
return pq.ParquetFile(buf)
-def test_byte_stream_split():
+@parametrize_legacy_dataset
+def test_byte_stream_split(use_legacy_dataset):
# This is only a smoke test.
arr_float = pa.array(list(map(float, range(100))))
arr_int = pa.array(list(map(int, range(100))))
@@ -696,26 +780,31 @@ def test_byte_stream_split():
table = pa.Table.from_arrays([arr_int], names=['tmp'])
with pytest.raises(IOError):
_check_roundtrip(table, expected=table, use_byte_stream_split=True,
- use_dictionary=False)
+ use_dictionary=False,
+ use_legacy_dataset=use_legacy_dataset)
-def test_compression_level():
+@parametrize_legacy_dataset
+def test_compression_level(use_legacy_dataset):
arr = pa.array(list(map(int, range(1000))))
data = [arr, arr]
table = pa.Table.from_arrays(data, names=['a', 'b'])
# Check one compression level.
_check_roundtrip(table, expected=table, compression="gzip",
- compression_level=1)
+ compression_level=1,
+ use_legacy_dataset=use_legacy_dataset)
# Check another one to make sure that compression_level=1 does not
# coincide with the default one in Arrow.
_check_roundtrip(table, expected=table, compression="gzip",
- compression_level=5)
+ compression_level=5,
+ use_legacy_dataset=use_legacy_dataset)
# Check that the user can provide a compression level per column
_check_roundtrip(table, expected=table, compression="gzip",
- compression_level={'a': 2, 'b': 3})
+ compression_level={'a': 2, 'b': 3},
+ use_legacy_dataset=use_legacy_dataset)
# Check that specifying a compression level for a codec which does allow
# specifying one, results into an error.
@@ -1356,7 +1445,8 @@ def test_fixed_size_binary():
@pytest.mark.pandas
-def test_multithreaded_read():
+@parametrize_legacy_dataset_skip_buffer
+def test_multithreaded_read(use_legacy_dataset):
df = alltypes_sample(size=10000)
table = pa.Table.from_pandas(df)
@@ -1365,16 +1455,19 @@ def test_multithreaded_read():
_write_table(table, buf, compression='SNAPPY', version='2.0')
buf.seek(0)
- table1 = _read_table(buf, use_threads=True)
+ table1 = _read_table(
+ buf, use_threads=True, use_legacy_dataset=use_legacy_dataset)
buf.seek(0)
- table2 = _read_table(buf, use_threads=False)
+ table2 = _read_table(
+ buf, use_threads=False, use_legacy_dataset=use_legacy_dataset)
assert table1.equals(table2)
@pytest.mark.pandas
-def test_min_chunksize():
+@parametrize_legacy_dataset_skip_buffer
+def test_min_chunksize(use_legacy_dataset):
data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
table = pa.Table.from_pandas(data.reset_index())
@@ -1382,7 +1475,7 @@ def test_min_chunksize():
_write_table(table, buf, chunk_size=-1)
buf.seek(0)
- result = _read_table(buf)
+ result = _read_table(buf, use_legacy_dataset=use_legacy_dataset)
assert result.equals(table)
@@ -1581,9 +1674,10 @@ def test_partition_set_dictionary_type():
@pytest.mark.pandas
-def test_read_partitioned_directory(tempdir):
+@parametrize_legacy_dataset
+def test_read_partitioned_directory(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
- _partition_test_for_filesystem(fs, tempdir)
+ _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset)
@pytest.mark.pandas
@@ -1604,7 +1698,8 @@ def test_create_parquet_dataset_multi_threaded(tempdir):
@pytest.mark.pandas
-def test_equivalency(tempdir):
+@parametrize_legacy_dataset
+def test_equivalency(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir
@@ -1631,7 +1726,8 @@ def test_equivalency(tempdir):
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
filters=[('integer', '=', 1), ('string', '!=', 'b'),
- ('boolean', '==', True)]
+ ('boolean', '==', True)],
+ use_legacy_dataset=use_legacy_dataset,
)
table = dataset.read()
result_df = (table.to_pandas().reset_index(drop=True))
@@ -1652,7 +1748,9 @@ def test_equivalency(tempdir):
],
[('integer', '=', 0), ('boolean', '==', 'False')]
]
- dataset = pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs, filters=filters,
+ use_legacy_dataset=use_legacy_dataset)
table = dataset.read()
result_df = table.to_pandas().reset_index(drop=True)
@@ -1668,19 +1766,28 @@ def test_equivalency(tempdir):
assert df_filter_2.sum() > 0
assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
- # Check for \0 in predicate values. Until they are correctly implemented
- # in ARROW-3391, they would otherwise lead to weird results with the
- # current code.
- with pytest.raises(NotImplementedError):
- filters = [[('string', '==', b'1\0a')]]
- pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
- with pytest.raises(NotImplementedError):
- filters = [[('string', '==', '1\0a')]]
- pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+ if use_legacy_dataset:
+ # Check for \0 in predicate values. Until they are correctly
+ # implemented in ARROW-3391, they would otherwise lead to weird
+ # results with the current code.
+ with pytest.raises(NotImplementedError):
+ filters = [[('string', '==', b'1\0a')]]
+ pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+ with pytest.raises(NotImplementedError):
+ filters = [[('string', '==', '1\0a')]]
+ pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+ else:
+ for filters in [[[('string', '==', b'1\0a')]],
+ [[('string', '==', '1\0a')]]]:
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs, filters=filters,
+ use_legacy_dataset=False)
+ assert dataset.read().num_rows == 0
@pytest.mark.pandas
-def test_cutoff_exclusive_integer(tempdir):
+@parametrize_legacy_dataset
+def test_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir
@@ -1702,7 +1809,8 @@ def test_cutoff_exclusive_integer(tempdir):
filters=[
('integers', '<', 4),
('integers', '>', 1),
- ]
+ ],
+ use_legacy_dataset=use_legacy_dataset
)
table = dataset.read()
result_df = (table.to_pandas()
@@ -1714,11 +1822,14 @@ def test_cutoff_exclusive_integer(tempdir):
@pytest.mark.pandas
+@parametrize_legacy_dataset
@pytest.mark.xfail(
- raises=TypeError,
+ # different error with use_legacy_datasets because result_df is no longer
+ # categorical
+ raises=(TypeError, AssertionError),
reason='Loss of type information in creation of categoricals.'
)
-def test_cutoff_exclusive_datetime(tempdir):
+def test_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir
@@ -1746,7 +1857,8 @@ def test_cutoff_exclusive_datetime(tempdir):
filters=[
('dates', '<', "2018-04-12"),
('dates', '>', "2018-04-10")
- ]
+ ],
+ use_legacy_dataset=use_legacy_dataset
)
table = dataset.read()
result_df = (table.to_pandas()
@@ -1761,7 +1873,8 @@ def test_cutoff_exclusive_datetime(tempdir):
@pytest.mark.pandas
-def test_inclusive_integer(tempdir):
+@parametrize_legacy_dataset
+def test_inclusive_integer(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir
@@ -1783,7 +1896,8 @@ def test_inclusive_integer(tempdir):
filters=[
('integers', '<=', 3),
('integers', '>=', 2),
- ]
+ ],
+ use_legacy_dataset=use_legacy_dataset
)
table = dataset.read()
result_df = (table.to_pandas()
@@ -1795,7 +1909,8 @@ def test_inclusive_integer(tempdir):
@pytest.mark.pandas
-def test_inclusive_set(tempdir):
+@parametrize_legacy_dataset
+def test_inclusive_set(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir
@@ -1820,7 +1935,8 @@ def test_inclusive_set(tempdir):
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
filters=[('integer', 'in', {1}), ('string', 'in', {'a', 'b'}),
- ('boolean', 'in', {True})]
+ ('boolean', 'in', {True})],
+ use_legacy_dataset=use_legacy_dataset
)
table = dataset.read()
result_df = (table.to_pandas().reset_index(drop=True))
@@ -1831,7 +1947,8 @@ def test_inclusive_set(tempdir):
@pytest.mark.pandas
-def test_invalid_pred_op(tempdir):
+@parametrize_legacy_dataset
+def test_invalid_pred_op(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir
@@ -1851,23 +1968,29 @@ def test_invalid_pred_op(tempdir):
with pytest.raises(ValueError):
pq.ParquetDataset(base_path,
filesystem=fs,
- filters=[
- ('integers', '=<', 3),
- ])
-
- with pytest.raises(ValueError):
- pq.ParquetDataset(base_path,
- filesystem=fs,
- filters=[
- ('integers', 'in', set()),
- ])
+ filters=[('integers', '=<', 3), ],
+ use_legacy_dataset=use_legacy_dataset)
- with pytest.raises(ValueError):
+ if use_legacy_dataset:
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', 'in', set()), ],
+ use_legacy_dataset=use_legacy_dataset)
+ else:
+ # Dataset API returns empty table instead
+ dataset = pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', 'in', set()), ],
+ use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().num_rows == 0
+
+ with pytest.raises(ValueError if use_legacy_dataset else TypeError):
+ # dataset API returns TypeError when trying create invalid comparison
pq.ParquetDataset(base_path,
filesystem=fs,
- filters=[
- ('integers', '!=', {3}),
- ])
+ filters=[('integers', '!=', {3})],
+ use_legacy_dataset=use_legacy_dataset)
@pytest.mark.pandas
@@ -1956,7 +2079,7 @@ def test_read_partitioned_directory_s3fs(s3_example):
dataset.read()
-def _partition_test_for_filesystem(fs, base_path):
+def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True):
foo_keys = [0, 1]
bar_keys = ['a', 'b', 'c']
partition_spec = [
@@ -1974,7 +2097,8 @@ def _partition_test_for_filesystem(fs, base_path):
_generate_partition_directories(fs, base_path, partition_spec, df)
- dataset = pq.ParquetDataset(base_path, filesystem=fs)
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
@@ -1983,8 +2107,11 @@ def _partition_test_for_filesystem(fs, base_path):
expected_df = (df.sort_values(by='index')
.reset_index(drop=True)
.reindex(columns=result_df.columns))
- expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
- expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
+ if use_legacy_dataset:
+ # TODO(dataset) Dataset API does not create categorical columns
+ # for partition keys
+ expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
+ expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
@@ -2133,7 +2260,8 @@ def _filter_partition(df, part_keys):
@pytest.mark.pandas
-def test_read_multiple_files(tempdir):
+@parametrize_legacy_dataset
+def test_read_multiple_files(tempdir, use_legacy_dataset):
nfiles = 10
size = 5
@@ -2159,8 +2287,11 @@ def test_read_multiple_files(tempdir):
# Write a _SUCCESS.crc file
(dirpath / '_SUCCESS.crc').touch()
- def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
- dataset = pq.ParquetDataset(paths, **kwargs)
+ # TODO(datasets) changed to use_threads=False because otherwise the
+ # row order is not deterministic
+ def read_multiple_files(paths, columns=None, use_threads=False, **kwargs):
+ dataset = pq.ParquetDataset(
+ paths, use_legacy_dataset=use_legacy_dataset, **kwargs)
return dataset.read(columns=columns, use_threads=use_threads)
result = read_multiple_files(paths)
@@ -2169,13 +2300,15 @@ def test_read_multiple_files(tempdir):
assert result.equals(expected)
# Read with provided metadata
- metadata = pq.read_metadata(paths[0])
+ # TODO(dataset) specifying metadata not yet supported
+ if use_legacy_dataset:
+ metadata = pq.read_metadata(paths[0])
- result2 = read_multiple_files(paths, metadata=metadata)
- assert result2.equals(expected)
+ result2 = read_multiple_files(paths, metadata=metadata)
+ assert result2.equals(expected)
- result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
- assert result3.equals(expected)
+ result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
+ assert result3.equals(expected)
# Read column subset
to_read = [0, 2, 6, result.num_columns - 1]
@@ -2197,6 +2330,10 @@ def test_read_multiple_files(tempdir):
t = pa.Table.from_pandas(bad_apple)
_write_table(t, bad_apple_path)
+ if not use_legacy_dataset:
+ # TODO(dataset) Dataset API skips bad files
+ return
+
bad_meta = pq.read_metadata(bad_apple_path)
with pytest.raises(ValueError):
@@ -2215,7 +2352,8 @@ def test_read_multiple_files(tempdir):
@pytest.mark.pandas
-def test_dataset_read_pandas(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_read_pandas(tempdir, use_legacy_dataset):
nfiles = 5
size = 5
@@ -2238,7 +2376,7 @@ def test_dataset_read_pandas(tempdir):
frames.append(df)
paths.append(path)
- dataset = pq.ParquetDataset(dirpath)
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
columns = ['uint8', 'strings']
result = dataset.read_pandas(columns=columns).to_pandas()
expected = pd.concat([x[columns] for x in frames])
@@ -2247,7 +2385,8 @@ def test_dataset_read_pandas(tempdir):
@pytest.mark.pandas
-def test_dataset_memory_map(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_memory_map(tempdir, use_legacy_dataset):
# ARROW-2627: Check that we can use ParquetDataset with memory-mapping
dirpath = tempdir / guid()
dirpath.mkdir()
@@ -2257,12 +2396,16 @@ def test_dataset_memory_map(tempdir):
table = pa.Table.from_pandas(df)
_write_table(table, path, version='2.0')
- dataset = pq.ParquetDataset(dirpath, memory_map=True)
- assert dataset.pieces[0].read().equals(table)
+ dataset = pq.ParquetDataset(
+ dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().equals(table)
+ if use_legacy_dataset:
+ assert dataset.pieces[0].read().equals(table)
@pytest.mark.pandas
-def test_dataset_enable_buffered_stream(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
dirpath = tempdir / guid()
dirpath.mkdir()
@@ -2272,11 +2415,15 @@ def test_dataset_enable_buffered_stream(tempdir):
_write_table(table, path, version='2.0')
with pytest.raises(ValueError):
- pq.ParquetDataset(dirpath, buffer_size=-64)
+ pq.ParquetDataset(
+ dirpath, buffer_size=-64,
+ use_legacy_dataset=use_legacy_dataset)
for buffer_size in [128, 1024]:
- dataset = pq.ParquetDataset(dirpath, buffer_size=buffer_size)
- assert dataset.pieces[0].read().equals(table)
+ dataset = pq.ParquetDataset(
+ dirpath, buffer_size=buffer_size,
+ use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().equals(table)
@pytest.mark.pandas
@@ -2336,9 +2483,18 @@ def _make_example_multifile_dataset(base_path,
nfiles=10, file_nrows=5):
return paths
+def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
+ if use_legacy_dataset:
+ assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+ else:
+ paths = [str(path.as_posix()) for path in paths]
+ assert set(paths) == set(dataset._dataset.files)
+
+
@pytest.mark.pandas
+@parametrize_legacy_dataset
@pytest.mark.parametrize('dir_prefix', ['_', '.'])
-def test_ignore_private_directories(tempdir, dir_prefix):
+def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset):
dirpath = tempdir / guid()
dirpath.mkdir()
@@ -2348,12 +2504,14 @@ def test_ignore_private_directories(tempdir,
dir_prefix):
# private directory
(dirpath / '{}staging'.format(dir_prefix)).mkdir()
- dataset = pq.ParquetDataset(dirpath)
- assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
@pytest.mark.pandas
-def test_ignore_hidden_files_dot(tempdir):
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset):
dirpath = tempdir / guid()
dirpath.mkdir()
@@ -2366,12 +2524,14 @@ def test_ignore_hidden_files_dot(tempdir):
with (dirpath / '.private').open('wb') as f:
f.write(b'gibberish')
- dataset = pq.ParquetDataset(dirpath)
- assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
@pytest.mark.pandas
-def test_ignore_hidden_files_underscore(tempdir):
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset):
dirpath = tempdir / guid()
dirpath.mkdir()
@@ -2384,12 +2544,14 @@ def test_ignore_hidden_files_underscore(tempdir):
with (dirpath / '_started_321').open('wb') as f:
f.write(b'abcd')
- dataset = pq.ParquetDataset(dirpath)
- assert set(map(str, paths)) == {x.path for x in dataset.pieces}
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
@pytest.mark.pandas
-def test_multiindex_duplicate_values(tempdir):
+@parametrize_legacy_dataset
+def test_multiindex_duplicate_values(tempdir, use_legacy_dataset):
num_rows = 3
numbers = list(range(num_rows))
index = pd.MultiIndex.from_arrays(
@@ -2403,7 +2565,7 @@ def test_multiindex_duplicate_values(tempdir):
filename = tempdir / 'dup_multi_index_levels.parquet'
_write_table(table, filename)
- result_table = _read_table(filename)
+ result_table = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
assert table.equals(result_table)
result_df = result_table.to_pandas()
@@ -2460,22 +2622,26 @@ def
test_noncoerced_nanoseconds_written_without_exception(tempdir):
pq.write_table(tb, filename, coerce_timestamps='ms', version='2.0')
-def test_read_non_existent_file(tempdir):
+@parametrize_legacy_dataset
+def test_read_non_existent_file(tempdir, use_legacy_dataset):
path = 'non-existent-file.parquet'
try:
- pq.read_table(path)
+ pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
except Exception as e:
assert path in e.args[0]
-def test_read_table_doesnt_warn(datadir):
+@parametrize_legacy_dataset
+def test_read_table_doesnt_warn(datadir, use_legacy_dataset):
with pytest.warns(None) as record:
- pq.read_table(datadir / 'v0.7.1.parquet')
+ pq.read_table(datadir / 'v0.7.1.parquet',
+ use_legacy_dataset=use_legacy_dataset)
assert len(record) == 0
def _test_write_to_dataset_with_partitions(base_path,
+ use_legacy_dataset=True,
filesystem=None,
schema=None,
index_name=None):
@@ -2505,12 +2671,19 @@ def _test_write_to_dataset_with_partitions(base_path,
# partitioned dataset
dataset = pq.ParquetDataset(base_path,
filesystem=filesystem,
- validate_schema=True)
+ validate_schema=True,
+ use_legacy_dataset=use_legacy_dataset)
# ARROW-2209: Ensure the dataset schema also includes the partition columns
- dataset_cols = set(dataset.schema.to_arrow_schema().names)
+ if use_legacy_dataset:
+ dataset_cols = set(dataset.schema.to_arrow_schema().names)
+ else:
+ # NB schema property is an arrow and not parquet schema
+ dataset_cols = set(dataset.schema.names)
+
assert dataset_cols == set(output_table.schema.names)
- input_table = dataset.read()
+ use_threads = deterministic_row_order(use_legacy_dataset)
+ input_table = dataset.read(use_threads=use_threads)
input_df = input_table.to_pandas()
# Read data back in and compare with original DataFrame
@@ -2520,12 +2693,15 @@ def _test_write_to_dataset_with_partitions(base_path,
# Partitioned columns become 'categorical' dtypes
input_df = input_df[cols]
- for col in partition_by:
- output_df[col] = output_df[col].astype('category')
+ if use_legacy_dataset:
+ for col in partition_by:
+ output_df[col] = output_df[col].astype('category')
assert output_df.equals(input_df)
-def _test_write_to_dataset_no_partitions(base_path, filesystem=None):
+def _test_write_to_dataset_no_partitions(base_path,
+ use_legacy_dataset=True,
+ filesystem=None):
# ARROW-1400
output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
'group2': list('eefeffgeee'),
@@ -2549,8 +2725,10 @@ def _test_write_to_dataset_no_partitions(base_path,
filesystem=None):
# Deduplicated incoming DataFrame should match
# original outgoing Dataframe
- input_table = pq.ParquetDataset(base_path,
- filesystem=filesystem).read()
+ input_table = pq.ParquetDataset(
+ base_path, filesystem=filesystem,
+ use_legacy_dataset=use_legacy_dataset
+ ).read()
input_df = input_table.to_pandas()
input_df = input_df.drop_duplicates()
input_df = input_df[cols]
@@ -2558,28 +2736,37 @@ def _test_write_to_dataset_no_partitions(base_path,
filesystem=None):
@pytest.mark.pandas
-def test_write_to_dataset_with_partitions(tempdir):
- _test_write_to_dataset_with_partitions(str(tempdir))
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset):
+ _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset)
@pytest.mark.pandas
-def test_write_to_dataset_with_partitions_and_schema(tempdir):
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_schema(
+ tempdir, use_legacy_dataset
+):
schema = pa.schema([pa.field('group1', type=pa.string()),
pa.field('group2', type=pa.string()),
pa.field('num', type=pa.int64()),
pa.field('nan', type=pa.int32()),
pa.field('date', type=pa.timestamp(unit='us'))])
- _test_write_to_dataset_with_partitions(str(tempdir), schema=schema)
+ _test_write_to_dataset_with_partitions(
+ str(tempdir), use_legacy_dataset, schema=schema)
@pytest.mark.pandas
-def test_write_to_dataset_with_partitions_and_index_name(tempdir):
- _test_write_to_dataset_with_partitions(str(tempdir),
- index_name='index_name')
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_index_name(
+ tempdir, use_legacy_dataset
+):
+ _test_write_to_dataset_with_partitions(
+ str(tempdir), use_legacy_dataset, index_name='index_name')
@pytest.mark.pandas
-def test_write_to_dataset_no_partitions(tempdir):
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset):
_test_write_to_dataset_no_partitions(str(tempdir))
@@ -2628,6 +2815,7 @@ def test_large_table_int32_overflow():
_write_table(table, f)
+# TODO(ARROW-8074) buffer support
def _simple_table_roundtrip(table, **write_kwargs):
stream = pa.BufferOutputStream()
_write_table(table, stream, **write_kwargs)
@@ -2696,7 +2884,8 @@ def test_list_of_binary_large_cell():
@pytest.mark.pandas
-def test_index_column_name_duplicate(tempdir):
+@parametrize_legacy_dataset
+def test_index_column_name_duplicate(tempdir, use_legacy_dataset):
data = {
'close': {
pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998,
@@ -2715,13 +2904,14 @@ def test_index_column_name_duplicate(tempdir):
dfx = pd.DataFrame(data).set_index('time', drop=False)
tdfx = pa.Table.from_pandas(dfx)
_write_table(tdfx, path)
- arrow_table = _read_table(path)
+ arrow_table = _read_table(path, use_legacy_dataset=use_legacy_dataset)
result_df = arrow_table.to_pandas()
tm.assert_frame_equal(result_df, dfx)
@pytest.mark.pandas
-def test_parquet_nested_convenience(tempdir):
+@parametrize_legacy_dataset
+def test_parquet_nested_convenience(tempdir, use_legacy_dataset):
# ARROW-1684
df = pd.DataFrame({
'a': [[1, 2, 3], None, [4, 5], []],
@@ -2733,15 +2923,18 @@ def test_parquet_nested_convenience(tempdir):
table = pa.Table.from_pandas(df, preserve_index=False)
_write_table(table, path)
- read = pq.read_table(path, columns=['a'])
+ read = pq.read_table(
+ path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
tm.assert_frame_equal(read.to_pandas(), df[['a']])
- read = pq.read_table(path, columns=['a', 'b'])
+ read = pq.read_table(
+ path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset)
tm.assert_frame_equal(read.to_pandas(), df)
@pytest.mark.pandas
-def test_backwards_compatible_index_naming(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_naming(datadir, use_legacy_dataset):
expected_string = b"""\
carat cut color clarity depth table price x y z
0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
@@ -2756,13 +2949,17 @@ carat cut color clarity depth table price
x y z
0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39"""
expected = pd.read_csv(io.BytesIO(expected_string), sep=r'\s{2,}',
index_col=None, header=0, engine='python')
- table = _read_table(datadir / 'v0.7.1.parquet')
+ table = _read_table(
+ datadir / 'v0.7.1.parquet', use_legacy_dataset=use_legacy_dataset)
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
-def test_backwards_compatible_index_multi_level_named(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_multi_level_named(
+ datadir, use_legacy_dataset
+):
expected_string = b"""\
carat cut color clarity depth table price x y z
0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
@@ -2781,13 +2978,17 @@ carat cut color clarity depth table price
x y z
header=0, engine='python'
).sort_index()
- table = _read_table(datadir / 'v0.7.1.all-named-index.parquet')
+ table = _read_table(datadir / 'v0.7.1.all-named-index.parquet',
+ use_legacy_dataset=use_legacy_dataset)
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
-def test_backwards_compatible_index_multi_level_some_named(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_multi_level_some_named(
+ datadir, use_legacy_dataset
+):
expected_string = b"""\
carat cut color clarity depth table price x y z
0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
@@ -2807,13 +3008,17 @@ carat cut color clarity depth table price
x y z
).sort_index()
expected.index = expected.index.set_names(['cut', None, 'clarity'])
- table = _read_table(datadir / 'v0.7.1.some-named-index.parquet')
+ table = _read_table(datadir / 'v0.7.1.some-named-index.parquet',
+ use_legacy_dataset=use_legacy_dataset)
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
-def test_backwards_compatible_column_metadata_handling(datadir):
+@parametrize_legacy_dataset
+def test_backwards_compatible_column_metadata_handling(
+ datadir, use_legacy_dataset
+):
expected = pd.DataFrame(
{'a': [1, 2, 3], 'b': [.1, .2, .3],
'c': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')})
@@ -2823,15 +3028,17 @@ def
test_backwards_compatible_column_metadata_handling(datadir):
names=['index', None])
path = datadir / 'v0.7.1.column-metadata-handling.parquet'
- table = _read_table(path)
+ table = _read_table(path, use_legacy_dataset=use_legacy_dataset)
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
- table = _read_table(path, columns=['a'])
+ table = _read_table(
+ path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
result = table.to_pandas()
tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True))
+# TODO(dataset) support pickling
def _make_dataset_for_pickling(tempdir, N=100):
path = tempdir / 'data.parquet'
fs = LocalFileSystem.get_instance()
@@ -2894,7 +3101,8 @@ def test_cloudpickle_dataset(tempdir, datadir):
@pytest.mark.pandas
-def test_decimal_roundtrip(tempdir):
+@parametrize_legacy_dataset
+def test_decimal_roundtrip(tempdir, use_legacy_dataset):
num_values = 10
columns = {}
@@ -2914,7 +3122,8 @@ def test_decimal_roundtrip(tempdir):
string_filename = str(filename)
table = pa.Table.from_pandas(expected)
_write_table(table, string_filename)
- result_table = _read_table(string_filename)
+ result_table = _read_table(
+ string_filename, use_legacy_dataset=use_legacy_dataset)
result = result_table.to_pandas()
tm.assert_frame_equal(result, expected)
@@ -2935,7 +3144,8 @@ def test_decimal_roundtrip_negative_scale(tempdir):
@pytest.mark.pandas
-def test_parquet_writer_context_obj(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_writer_context_obj(tempdir, use_legacy_dataset):
df = _test_dataframe(100)
df['unique_id'] = 0
@@ -2953,14 +3163,18 @@ def test_parquet_writer_context_obj(tempdir):
frames.append(df.copy())
buf = out.getvalue()
- result = _read_table(pa.BufferReader(buf))
+ result = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
expected = pd.concat(frames, ignore_index=True)
tm.assert_frame_equal(result.to_pandas(), expected)
@pytest.mark.pandas
-def test_parquet_writer_context_obj_with_exception(tempdir):
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_writer_context_obj_with_exception(
+ tempdir, use_legacy_dataset
+):
df = _test_dataframe(100)
df['unique_id'] = 0
@@ -2985,21 +3199,23 @@ def
test_parquet_writer_context_obj_with_exception(tempdir):
assert str(e) == error_text
buf = out.getvalue()
- result = _read_table(pa.BufferReader(buf))
+ result = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
expected = pd.concat(frames, ignore_index=True)
tm.assert_frame_equal(result.to_pandas(), expected)
@pytest.mark.pandas
-def test_zlib_compression_bug():
+@parametrize_legacy_dataset_skip_buffer
+def test_zlib_compression_bug(use_legacy_dataset):
# ARROW-3514: "zlib deflate failed, output buffer too small"
table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
f = io.BytesIO()
pq.write_table(table, f, compression='gzip')
f.seek(0)
- roundtrip = pq.read_table(f)
+ roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas())
@@ -3051,7 +3267,8 @@ def test_empty_row_groups(tempdir):
@pytest.mark.pandas
-def test_parquet_writer_with_caller_provided_filesystem():
+@parametrize_legacy_dataset_skip_buffer
+def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset):
out = pa.BufferOutputStream()
class CustomFS(FileSystem):
@@ -3078,7 +3295,8 @@ def test_parquet_writer_with_caller_provided_filesystem():
assert out.closed
buf = out.getvalue()
- table_read = _read_table(pa.BufferReader(buf))
+ table_read = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df_read, df)
@@ -3097,7 +3315,8 @@ def test_writing_empty_lists():
_check_roundtrip(table)
-def test_write_nested_zero_length_array_chunk_failure():
+@parametrize_legacy_dataset
+def test_write_nested_zero_length_array_chunk_failure(use_legacy_dataset):
# Bug report in ARROW-3792
cols = OrderedDict(
int32=pa.int32(),
@@ -3122,11 +3341,12 @@ def test_write_nested_zero_length_array_chunk_failure():
my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols))
for batch in my_arrays]
tbl = pa.Table.from_batches(my_batches, pa.schema(cols))
- _check_roundtrip(tbl)
+ _check_roundtrip(tbl, use_legacy_dataset=use_legacy_dataset)
@pytest.mark.pandas
-def test_partitioned_dataset(tempdir):
+@parametrize_legacy_dataset
+def test_partitioned_dataset(tempdir, use_legacy_dataset):
# ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset
# to a Parquet file
path = tempdir / "ARROW-3208"
@@ -3138,7 +3358,8 @@ def test_partitioned_dataset(tempdir):
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=str(path),
partition_cols=['one', 'two'])
- table = pq.ParquetDataset(path).read()
+ table = pq.ParquetDataset(
+ path, use_legacy_dataset=use_legacy_dataset).read()
pq.write_table(table, path / "output.parquet")
@@ -3156,7 +3377,8 @@ def test_read_column_invalid_index():
@pytest.mark.pandas
-def test_direct_read_dictionary():
+@parametrize_legacy_dataset_skip_buffer
+def test_direct_read_dictionary(use_legacy_dataset):
# ARROW-3325
repeats = 10
nunique = 5
@@ -3172,7 +3394,8 @@ def test_direct_read_dictionary():
contents = bio.getvalue()
result = pq.read_table(pa.BufferReader(contents),
- read_dictionary=['f0'])
+ read_dictionary=['f0'],
+ use_legacy_dataset=use_legacy_dataset)
# Compute dictionary-encoded subfield
expected = pa.table([table[0].dictionary_encode()], names=['f0'])
@@ -3180,14 +3403,17 @@ def test_direct_read_dictionary():
@pytest.mark.pandas
-def test_dataset_read_dictionary(tempdir):
+@parametrize_legacy_dataset
+def test_dataset_read_dictionary(tempdir, use_legacy_dataset):
path = tempdir / "ARROW-3325-dataset"
t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
pq.write_to_dataset(t1, root_path=str(path))
pq.write_to_dataset(t2, root_path=str(path))
- result = pq.ParquetDataset(path, read_dictionary=['f0']).read()
+ result = pq.ParquetDataset(
+ path, read_dictionary=['f0'],
+ use_legacy_dataset=use_legacy_dataset).read()
# The order of the chunks is non-deterministic
ex_chunks = [t1[0].chunk(0).dictionary_encode(),
@@ -3203,7 +3429,8 @@ def test_dataset_read_dictionary(tempdir):
@pytest.mark.pandas
-def test_direct_read_dictionary_subfield():
+@parametrize_legacy_dataset_skip_buffer
+def test_direct_read_dictionary_subfield(use_legacy_dataset):
repeats = 10
nunique = 5
@@ -3216,7 +3443,8 @@ def test_direct_read_dictionary_subfield():
pq.write_table(table, bio)
contents = bio.getvalue()
result = pq.read_table(pa.BufferReader(contents),
- read_dictionary=['f0.list.item'])
+ read_dictionary=['f0.list.item'],
+ use_legacy_dataset=use_legacy_dataset)
arr = pa.array(data[0])
values_as_dict = arr.values.dictionary_encode()
@@ -3298,7 +3526,10 @@ def test_write_to_dataset_metadata(tempdir):
assert d1 == d2
-def test_parquet_file_too_small(tempdir):
+# TODO(dataset) better error message for invalid files (certainly if it
+# is the only one)
+@parametrize_legacy_dataset_not_supported
+def test_parquet_file_too_small(tempdir, use_legacy_dataset):
path = str(tempdir / "test.parquet")
with pytest.raises(pa.ArrowInvalid,
match='size is 0 bytes'):
@@ -3314,7 +3545,8 @@ def test_parquet_file_too_small(tempdir):
@pytest.mark.pandas
-def test_categorical_index_survives_roundtrip():
+@parametrize_legacy_dataset_skip_buffer
+def test_categorical_index_survives_roundtrip(use_legacy_dataset):
# ARROW-3652, addressed by ARROW-3246
df = pd.DataFrame([['a', 'b'], ['c', 'd']], columns=['c1', 'c2'])
df['c1'] = df['c1'].astype('category')
@@ -3323,13 +3555,15 @@ def test_categorical_index_survives_roundtrip():
table = pa.Table.from_pandas(df)
bos = pa.BufferOutputStream()
pq.write_table(table, bos)
- ref_df = pq.read_pandas(bos.getvalue()).to_pandas()
+ ref_df = pq.read_pandas(
+ bos.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas()
assert isinstance(ref_df.index, pd.CategoricalIndex)
assert ref_df.index.equals(df.index)
@pytest.mark.pandas
-def test_categorical_order_survives_roundtrip():
+@parametrize_legacy_dataset_skip_buffer
+def test_categorical_order_survives_roundtrip(use_legacy_dataset):
# ARROW-6302
df = pd.DataFrame({"a": pd.Categorical(
["a", "b", "c", "a"], categories=["b", "c", "d"], ordered=True)})
@@ -3339,7 +3573,8 @@ def test_categorical_order_survives_roundtrip():
pq.write_table(table, bos)
contents = bos.getvalue()
- result = pq.read_pandas(contents).to_pandas()
+ result = pq.read_pandas(
+ contents, use_legacy_dataset=use_legacy_dataset).to_pandas()
tm.assert_frame_equal(result, df)
@@ -3351,7 +3586,8 @@ def _simple_table_write_read(table):
return pq.read_table(pa.BufferReader(contents))
-def test_dictionary_array_automatically_read():
+@parametrize_legacy_dataset_skip_buffer
+def test_dictionary_array_automatically_read(use_legacy_dataset):
# ARROW-3246
# Make a large dictionary, a little over 4MB of data
@@ -3416,7 +3652,8 @@ def test_field_id_metadata():
@pytest.mark.pandas
-def test_pandas_categorical_na_type_row_groups():
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_categorical_na_type_row_groups(use_legacy_dataset):
# ARROW-5085
df = pd.DataFrame({"col": [None] * 100, "int": [1.0] * 100})
df_category = df.astype({"col": "category", "int": "category"})
@@ -3426,7 +3663,8 @@ def test_pandas_categorical_na_type_row_groups():
# it works
pq.write_table(table_cat, buf, version="2.0", chunk_size=10)
- result = pq.read_table(buf.getvalue())
+ result = pq.read_table(
+ buf.getvalue(), use_legacy_dataset=use_legacy_dataset)
# Result is non-categorical
assert result[0].equals(table[0])
@@ -3434,7 +3672,8 @@ def test_pandas_categorical_na_type_row_groups():
@pytest.mark.pandas
-def test_pandas_categorical_roundtrip():
+@parametrize_legacy_dataset_skip_buffer
+def test_pandas_categorical_roundtrip(use_legacy_dataset):
# ARROW-5480, this was enabled by ARROW-3246
# Have one of the categories unobserved and include a null (-1)
@@ -3446,7 +3685,8 @@ def test_pandas_categorical_roundtrip():
buf = pa.BufferOutputStream()
pq.write_table(pa.table(df), buf)
- result = pq.read_table(buf.getvalue()).to_pandas()
+ result = pq.read_table(
+ buf.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas()
assert result.x.dtype == 'category'
assert (result.x.cat.categories == categories).all()
tm.assert_frame_equal(result, df)
@@ -3495,8 +3735,9 @@ def test_multi_dataset_metadata(tempdir):
assert md['serialized_size'] > 0
+@parametrize_legacy_dataset
@pytest.mark.pandas
-def test_filter_before_validate_schema(tempdir):
+def test_filter_before_validate_schema(tempdir, use_legacy_dataset):
# ARROW-4076 apply filter before schema validation
# to avoid checking unneeded schemas
@@ -3513,7 +3754,8 @@ def test_filter_before_validate_schema(tempdir):
pq.write_table(table2, dir2 / 'data.parquet')
# read single file using filter
- table = pq.read_table(tempdir, filters=[[('A', '==', 0)]])
+ table = pq.read_table(tempdir, filters=[[('A', '==', 0)]],
+ use_legacy_dataset=use_legacy_dataset)
assert table.column('B').equals(pa.chunked_array([[1, 2, 3]]))
@@ -3556,6 +3798,7 @@ def test_fastparquet_cross_compatibility(tempdir):
tm.assert_frame_equal(table_fp.to_pandas(), df)
+@parametrize_legacy_dataset_skip_buffer
@pytest.mark.parametrize('array_factory', [
lambda: pa.array([0, None] * 10),
lambda: pa.array([0, None] * 10).dictionary_encode(),
@@ -3564,7 +3807,9 @@ def test_fastparquet_cross_compatibility(tempdir):
])
@pytest.mark.parametrize('use_dictionary', [False, True])
@pytest.mark.parametrize('read_dictionary', [False, True])
-def test_buffer_contents(array_factory, use_dictionary, read_dictionary):
+def test_buffer_contents(
+ array_factory, use_dictionary, read_dictionary, use_legacy_dataset
+):
# Test that null values are deterministically initialized to zero
# after a roundtrip through Parquet.
# See ARROW-8006 and ARROW-8011.
@@ -3574,9 +3819,66 @@ def test_buffer_contents(array_factory, use_dictionary,
read_dictionary):
bio.seek(0)
read_dictionary = ['col'] if read_dictionary else None
table = pq.read_table(bio, use_threads=False,
- read_dictionary=read_dictionary)
+ read_dictionary=read_dictionary,
+ use_legacy_dataset=use_legacy_dataset)
for col in table.columns:
[chunk] = col.chunks
buf = chunk.buffers()[1]
assert buf.to_pybytes() == buf.size * b"\0"
+
+
[email protected]
+def test_dataset_unsupported_keywords():
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([]))
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([]))
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, validate_schema=False)
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, split_row_groups=True)
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, metadata_nthreads=4)
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.read_table("", use_legacy_dataset=False, metadata=pa.schema([]))
+
+
[email protected]
+def test_dataset_partitioning(tempdir):
+ import pyarrow.dataset as ds
+
+ # create small dataset with directory partitioning
+ root_path = tempdir / "test_partitioning"
+ (root_path / "2012" / "10" / "01").mkdir(parents=True)
+
+ table = pa.table({'a': [1, 2, 3]})
+ pq.write_table(
+ table, str(root_path / "2012" / "10" / "01" / "data.parquet"))
+
+ # This works with new dataset API
+
+ # read_table
+ part = ds.partitioning(field_names=["year", "month", "day"])
+ result = pq.read_table(
+ str(root_path), partitioning=part, use_legacy_dataset=False)
+ assert result.column_names == ["a", "year", "month", "day"]
+
+ result = pq.ParquetDataset(
+ str(root_path), partitioning=part, use_legacy_dataset=False).read()
+ assert result.column_names == ["a", "year", "month", "day"]
+
+ # This raises an error for legacy dataset
+ with pytest.raises(ValueError):
+ pq.read_table(
+ str(root_path), partitioning=part, use_legacy_dataset=True)
+
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(
+ str(root_path), partitioning=part, use_legacy_dataset=True)